diff --git a/.vscode/settings.json b/.vscode/settings.json index 7143033c1..2c55951ae 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -29,6 +29,7 @@ "inheritDoc", "IWRR", "jaywcjlove", + "Jérôme", "lcov", "libuv", "loglevel", diff --git a/README.md b/README.md index f212ccd5e..f3e77799b 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,7 @@ Please consult our [general guidelines](#general-guidelines). - Tasks stealing under back pressure ✔ - Tasks redistribution on worker error ✔ - Support for sync and async task function ✔ +- Support for abortable task function ✔ - Support for multiple task functions with per task function queuing priority and tasks distribution strategy ✔ - Support for task functions diff --git a/benchmarks/benchmarks-utils.mjs b/benchmarks/benchmarks-utils.mjs index 93c2303c4..31d84d811 100644 --- a/benchmarks/benchmarks-utils.mjs +++ b/benchmarks/benchmarks-utils.mjs @@ -258,14 +258,13 @@ const fibonacci = (n) => { const factorial = (n) => { if (n === 0 || n === 1) { return 1n - } else { - n = BigInt(n) - let factorial = 1n - for (let i = 1n; i <= n; i++) { - factorial *= i - } - return factorial } + n = BigInt(n) + let factorial = 1n + for (let i = 1n; i <= n; i++) { + factorial *= i + } + return factorial } const readWriteFiles = ( diff --git a/deno.json b/deno.json index 8b5c50cdb..2015bb4ab 100644 --- a/deno.json +++ b/deno.json @@ -3,9 +3,7 @@ "version": "0.4.31", "exports": "./src/mod.ts", "compilerOptions": { - "lib": [ - "deno.worker" - ], + "lib": ["deno.worker"], "strict": true }, "tasks": { @@ -26,9 +24,7 @@ "documentation": "deno doc ./src/mod.ts" }, "test": { - "include": [ - "./tests/**/*.test.mjs" - ] + "include": ["./tests/**/*.test.mjs"] }, "fmt": { "semiColons": false, @@ -41,18 +37,8 @@ "@std/testing": "jsr:@std/testing@^1.0.14" }, "publish": { - "include": [ - "LICENSE", - "README.md", - "deno.json", - "src/**/*.ts" - ] + "include": ["LICENSE", "README.md", "deno.json", "src/**/*.ts"] }, "lock": false, - "exclude": [ - "./coverage", - "./dist/browser", - "./dist/esm", - "./npm" - ] + "exclude": ["./coverage", "./dist/browser", "./dist/esm", "./npm"] } diff --git a/docs/api.md b/docs/api.md index 422822e99..c606db50e 100644 --- a/docs/api.md +++ b/docs/api.md @@ -5,8 +5,8 @@ - [Pool](#pool) - [`pool = new FixedThreadPool(numberOfThreads, fileURL, opts)`](#pool--new-fixedthreadpoolnumberofthreads-fileurl-opts) - [`pool = new DynamicThreadPool(min, max, fileURL, opts)`](#pool--new-dynamicthreadpoolmin-max-fileurl-opts) - - [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist) - - [`pool.mapExecute(data, name, transferList)`](#poolmapexecutedata-name-transferlist) + - [`pool.execute(data, name, abortSignal, transferList)`](#poolexecutedata-name-abortsignal-transferlist) + - [`pool.mapExecute(data, name, abortSignals, transferList)`](#poolmapexecutedata-name-abortsignals-transferlist) - [`pool.start()`](#poolstart) - [`pool.destroy()`](#pooldestroy) - [`pool.hasTaskFunction(name)`](#poolhastaskfunctionname) @@ -41,12 +41,13 @@ override it in your worker implementation).\ `fileURL` (mandatory) URL to a file with a worker implementation.\ `opts` (optional) An object with the pool options properties described below. -### `pool.execute(data, name, transferList)` +### `pool.execute(data, name, abortSignal, transferList)` `data` (optional) An object that you want to pass to your worker task function implementation.\ `name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'`\ +`abortSignal` (optional) An abort signal to abort the task function execution.\ `transferList` (optional) An array of transferable objects that you want to transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworker) implementation. @@ -54,12 +55,14 @@ implementation. This method is available on both pool implementations and returns a promise with the task function execution response. -### `pool.mapExecute(data, name, transferList)` +### `pool.mapExecute(data, name, abortSignals, transferList)` -`data` Iterable objects that you want to pass to your worker task function +`data` An iterable of objects that you want to pass to your worker task function implementation.\ `name` (optional) A string with the task function name that you want to execute on the worker. Default: `'default'`\ +`abortSignals` (optional) An iterable of AbortSignal to abort the matching +object task function execution.\ `transferList` (optional) An array of transferable objects that you want to transfer to your [`ThreadWorker`](#class-yourworker-extends-threadworker)) worker implementation. @@ -129,7 +132,6 @@ An object with these properties: - `workerChoiceStrategy` (optional) - The default worker choice strategy to use in this pool: - - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the @@ -159,7 +161,6 @@ An object with these properties: - `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool.\ Properties: - - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` - `runTime` (optional) - Use the tasks @@ -195,7 +196,6 @@ An object with these properties: - `tasksQueueOptions` (optional) - The worker tasks queue options object to use in this pool.\ Properties: - - `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer. - `concurrency` (optional) - The maximum number of tasks that can be executed diff --git a/src/mod.ts b/src/mod.ts index 1cebefdb2..2dc329637 100644 --- a/src/mod.ts +++ b/src/mod.ts @@ -1,5 +1,5 @@ +export type { CircularBuffer } from './circular-buffer.ts' export type { AbstractPool } from './pools/abstract-pool.ts' -export { PoolEvents, PoolTypes } from './pools/pool.ts' export type { IPool, PoolEvent, @@ -8,7 +8,25 @@ export type { PoolType, TasksQueueOptions, } from './pools/pool.ts' -export { WorkerTypes } from './pools/worker.ts' +export { PoolEvents, PoolTypes } from './pools/pool.ts' +export type { + IWorkerChoiceStrategy, + Measurement, + MeasurementOptions, + MeasurementStatisticsRequirements, + StrategyPolicy, + TaskStatisticsRequirements, + WorkerChoiceStrategy, + WorkerChoiceStrategyOptions, +} from './pools/selection-strategies/selection-strategies-types.ts' +export { + Measurements, + WorkerChoiceStrategies, +} from './pools/selection-strategies/selection-strategies-types.ts' +export type { WorkerChoiceStrategiesContext } from './pools/selection-strategies/worker-choice-strategies-context.ts' +export { DynamicThreadPool } from './pools/thread/dynamic.ts' +export type { ThreadPoolOptions } from './pools/thread/fixed.ts' +export { FixedThreadPool } from './pools/thread/fixed.ts' export type { ErrorEventHandler, EventLoopUtilizationMeasurementStatistics, @@ -25,40 +43,9 @@ export type { WorkerType, WorkerUsage, } from './pools/worker.ts' -export { - Measurements, - WorkerChoiceStrategies, -} from './pools/selection-strategies/selection-strategies-types.ts' -export type { - IWorkerChoiceStrategy, - Measurement, - MeasurementOptions, - MeasurementStatisticsRequirements, - StrategyPolicy, - TaskStatisticsRequirements, - WorkerChoiceStrategy, - WorkerChoiceStrategyOptions, -} from './pools/selection-strategies/selection-strategies-types.ts' -export type { WorkerChoiceStrategiesContext } from './pools/selection-strategies/worker-choice-strategies-context.ts' -export { DynamicThreadPool } from './pools/thread/dynamic.ts' -export { FixedThreadPool } from './pools/thread/fixed.ts' -export type { ThreadPoolOptions } from './pools/thread/fixed.ts' -export type { AbstractWorker } from './worker/abstract-worker.ts' -export { ThreadWorker } from './worker/thread-worker.ts' -export { KillBehaviors } from './worker/worker-options.ts' -export type { - KillBehavior, - KillHandler, - WorkerOptions, -} from './worker/worker-options.ts' -export type { - TaskAsyncFunction, - TaskFunction, - TaskFunctionObject, - TaskFunctionOperationResult, - TaskFunctions, - TaskSyncFunction, -} from './worker/task-functions.ts' +export { WorkerTypes } from './pools/worker.ts' +export type { PriorityQueue } from './queues/priority-queue.ts' +export type { FixedQueueNode, IFixedQueue } from './queues/queue-types.ts' export type { MessageValue, PromiseResponseWrapper, @@ -69,7 +56,20 @@ export type { WorkerStatistics, Writable, } from './utility-types.ts' -export type { CircularBuffer } from './circular-buffer.ts' -export type { PriorityQueue } from './queues/priority-queue.ts' -export type { FixedQueueNode, IFixedQueue } from './queues/queue-types.ts' export { availableParallelism } from './utils.ts' +export type { AbstractWorker } from './worker/abstract-worker.ts' +export type { + TaskAsyncFunction, + TaskFunction, + TaskFunctionObject, + TaskFunctionOperationResult, + TaskFunctions, + TaskSyncFunction, +} from './worker/task-functions.ts' +export { ThreadWorker } from './worker/thread-worker.ts' +export type { + KillBehavior, + KillHandler, + WorkerOptions, +} from './worker/worker-options.ts' +export { KillBehaviors } from './worker/worker-options.ts' diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 762f2d4f8..87ca7fa23 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -53,7 +53,6 @@ import { waitWorkerNodeEvents, } from './utils.ts' import { version } from './version.ts' -import { WorkerNode } from './worker-node.ts' import type { IWorker, IWorkerNode, @@ -61,6 +60,7 @@ import type { WorkerNodeEventDetail, WorkerType, } from './worker.ts' +import { WorkerNode } from './worker-node.ts' /** * Base class that implements some shared logic for all poolifier pools. @@ -219,15 +219,18 @@ export abstract class AbstractPool< throw new Error( 'Cannot instantiate a pool without specifying the number of workers', ) - } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) { + } + if (!Number.isSafeInteger(minimumNumberOfWorkers)) { throw new TypeError( 'Cannot instantiate a pool with a non safe integer number of workers', ) - } else if (minimumNumberOfWorkers < 0) { + } + if (minimumNumberOfWorkers < 0) { throw new RangeError( 'Cannot instantiate a pool with a negative number of workers', ) - } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) { + } + if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) { throw new RangeError('Cannot instantiate a fixed pool with zero worker') } } @@ -544,7 +547,8 @@ export abstract class AbstractPool< private checkMessageWorkerId(message: MessageValue): void { if (message.workerId == null) { throw new Error('Worker message received without worker id') - } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) { + } + if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) { throw new Error( `Worker message received from unknown worker '${message.workerId.toString()}': ${ JSON.stringify( @@ -826,7 +830,7 @@ export abstract class AbstractPool< message: MessageValue, ): Promise { return await new Promise((resolve, reject) => { - const responsesReceived = new Array>() + const responsesReceived: MessageValue[] = [] const taskFunctionOperationsListener = ( message: MessageValue, ): void => { @@ -952,6 +956,19 @@ export abstract class AbstractPool< return [] } + private readonly getAbortError = ( + taskName: string, + taskId: `${string}-${string}-${string}-${string}-${string}`, + ): Error => { + const abortError = this.promiseResponseMap.get(taskId)?.abortSignal + ?.reason as Error | string + return abortError instanceof Error + ? abortError + : typeof abortError === 'string' + ? new Error(abortError) + : new Error(`Task '${taskName}' id '${taskId}' aborted`) + } + /** * Gets task function worker choice strategy, if any. * @@ -1065,12 +1082,14 @@ export abstract class AbstractPool< public async internalExecute( data?: Data, name?: string, + abortSignal?: AbortSignal, transferList?: Transferable[], ): Promise { return await new Promise((resolve, reject) => { const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode(name) const task: Task = { + abortable: abortSignal != null, name: name ?? DEFAULT_TASK_NAME, data: data ?? ({} as Data), priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), @@ -1082,6 +1101,26 @@ export abstract class AbstractPool< timestamp, taskId: crypto.randomUUID(), } + abortSignal?.addEventListener( + 'abort', + () => { + this.workerNodes[workerNodeKey]?.dispatchEvent( + new CustomEvent('abortTask', { + detail: { + taskId: task.taskId, + workerId: this.getWorkerInfo(workerNodeKey)!.id!, + }, + }), + ) + }, + { once: true }, + ) + this.promiseResponseMap.set(task.taskId!, { + reject, + resolve, + workerNodeKey, + abortSignal, + }) if ( this.opts.enableTasksQueue === false || (this.opts.enableTasksQueue === true && @@ -1091,13 +1130,6 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } - queueMicrotask(() => { - this.promiseResponseMap.set(task.taskId!, { - resolve, - reject, - workerNodeKey, - }) - }) }) } @@ -1105,6 +1137,7 @@ export abstract class AbstractPool< public async execute( data?: Data, name?: string, + abortSignal?: AbortSignal, transferList?: readonly Transferable[], ): Promise { if (!this.started) { @@ -1119,16 +1152,20 @@ export abstract class AbstractPool< if (name != null && typeof name === 'string' && name.trim().length === 0) { throw new TypeError('name argument must not be an empty string') } + if (abortSignal != null && !(abortSignal instanceof AbortSignal)) { + throw new TypeError('abortSignal argument must be an AbortSignal') + } if (transferList != null && !Array.isArray(transferList)) { throw new TypeError('transferList argument must be an array') } - return await this.internalExecute(data, name, transferList) + return await this.internalExecute(data, name, abortSignal, transferList) } /** @inheritDoc */ public async mapExecute( data: Iterable, name?: string, + abortSignals?: Iterable, transferList?: readonly Transferable[], ): Promise { if (!this.started) { @@ -1149,15 +1186,42 @@ export abstract class AbstractPool< if (name != null && typeof name === 'string' && name.trim().length === 0) { throw new TypeError('name argument must not be an empty string') } - if (transferList != null && !Array.isArray(transferList)) { - throw new TypeError('transferList argument must be an array') - } if (!Array.isArray(data)) { data = [...data] } + if (abortSignals != null) { + if (typeof abortSignals[Symbol.iterator] !== 'function') { + throw new TypeError('abortSignals argument must be an iterable') + } + for (const abortSignal of abortSignals) { + if (!(abortSignal instanceof AbortSignal)) { + throw new TypeError( + 'abortSignals argument must be an iterable of AbortSignal', + ) + } + } + if (!Array.isArray(abortSignals)) { + abortSignals = [...abortSignals] + } + if ((data as Data[]).length !== (abortSignals as AbortSignal[]).length) { + throw new Error( + 'data and abortSignals arguments must have the same length', + ) + } + } + if (transferList != null && !Array.isArray(transferList)) { + throw new TypeError('transferList argument must be an array') + } + const tasks: [Data, AbortSignal | undefined][] = Array.from( + { length: (data as Data[]).length }, + (_, i) => [ + (data as Data[])[i], + abortSignals != null ? (abortSignals as AbortSignal[])[i] : undefined, + ], + ) return await Promise.all( - (data as Data[]).map((data) => - this.internalExecute(data, name, transferList) + tasks.map(([data, abortSignal]) => + this.internalExecute(data, name, abortSignal, transferList) ), ) } @@ -1679,6 +1743,10 @@ export abstract class AbstractPool< ) } } + this.workerNodes[workerNodeKey].addEventListener( + 'abortTask', + this.abortTask as EventListener, + ) } /** @@ -1818,9 +1886,11 @@ export abstract class AbstractPool< !sourceWorkerNode.info.ready || sourceWorkerNode.info.stolen || sourceWorkerNode.info.stealing || + sourceWorkerNode.info.queuedTaskAbortion || !destinationWorkerNode.info.ready || destinationWorkerNode.info.stolen || - destinationWorkerNode.info.stealing + destinationWorkerNode.info.stealing || + destinationWorkerNode.info.queuedTaskAbortion ) { return } @@ -1843,7 +1913,6 @@ export abstract class AbstractPool< (this.info.stealingWorkerNodes ?? 0) > Math.ceil( this.workerNodes.length * - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.opts.tasksQueueOptions!.tasksStealingRatio!, ) ) @@ -2018,7 +2087,7 @@ export abstract class AbstractPool< } private handleTaskExecutionResponse(message: MessageValue): void { - const { taskId, workerError, data } = message + const { name, taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId!) if (promiseResponse != null) { const { resolve, reject, workerNodeKey } = promiseResponse @@ -2027,7 +2096,12 @@ export abstract class AbstractPool< this.eventTarget?.dispatchEvent( new ErrorEvent(PoolEvents.taskError, { error: workerError }), ) - reject(workerError.error) + const { aborted, error } = workerError + let wError: Error = error + if (aborted) { + wError = this.getAbortError(name!, taskId!) + } + reject(wError) } else { resolve(data!) } @@ -2157,6 +2231,42 @@ export abstract class AbstractPool< return workerNode } + private readonly abortTask = ( + event: CustomEvent, + ): void => { + if (!this.started || this.destroying) { + return + } + const { taskId, workerId } = event.detail + const promiseResponse = this.promiseResponseMap.get(taskId!) + if (promiseResponse == null) { + return + } + const { abortSignal, reject } = promiseResponse + if (abortSignal?.aborted === false) { + return + } + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerNode = this.workerNodes[workerNodeKey] + if (!workerNode.info.ready) { + return + } + if (this.opts.enableTasksQueue === true) { + for (const task of workerNode.tasksQueue) { + const { abortable, name } = task + if (taskId === task.taskId && abortable === true) { + workerNode.info.queuedTaskAbortion = true + workerNode.deleteTask(task) + this.promiseResponseMap.delete(taskId!) + workerNode.info.queuedTaskAbortion = false + reject(this.getAbortError(name!, taskId!)) + return + } + } + } + this.sendToWorker(workerNodeKey, { taskId, taskOperation: 'abort' }) + } + /** * Adds the given worker node in the pool worker nodes. * @@ -2219,8 +2329,9 @@ export abstract class AbstractPool< * @param task - The task to execute. */ private executeTask(workerNodeKey: number, task: Task): void { + const { transferList } = task this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker(workerNodeKey, task, task.transferList) + this.sendToWorker(workerNodeKey, task, transferList) this.checkAndEmitTaskExecutionEvents() } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index c6d249c50..1a95bed61 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -278,12 +278,14 @@ export interface IPool< * * @param data - The optional task input data for the specified task function. This can only be structured-cloneable data. * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed. - * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's web worker and they should not be used in the main thread afterwards. + * @param abortSignal - The optional AbortSignal to abort the task. + * @param transferList - The optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's web worker and they should not be used in the main thread afterwards. * @returns Promise with a task function response that will be fulfilled when the task is completed. */ readonly execute: ( data?: Data, name?: string, + abortSignal?: AbortSignal, transferList?: readonly Transferable[], ) => Promise /** @@ -291,12 +293,14 @@ export interface IPool< * * @param data - The tasks iterable input data for the specified task function. This can only be an iterable of structured-cloneable data. * @param name - The optional name of the task function to execute. If not specified, the default task function will be executed. - * @param transferList - An optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards. + * @param abortSignals - The optional iterable of AbortSignal to abort the tasks iterable. + * @param transferList - The optional array of transferable objects to transfer ownership of. Ownership of the transferred objects is given to the chosen pool's worker_threads worker and they should not be used in the main thread afterwards. * @returns Promise with an array of task function responses that will be fulfilled when the tasks are completed. */ readonly mapExecute: ( data: Iterable, name?: string, + abortSignals?: Iterable, transferList?: readonly Transferable[], ) => Promise /** diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 783b48425..29bcd346f 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -112,7 +112,7 @@ export abstract class AbstractWorkerChoiceStrategy< * @returns Whether the worker node is ready or not. */ protected isWorkerNodeReady(workerNodeKey: number): boolean { - return this.pool.workerNodes[workerNodeKey]?.info?.ready ?? false + return this.pool.workerNodes[workerNodeKey]?.info.ready ?? false } /** @@ -138,8 +138,8 @@ export abstract class AbstractWorkerChoiceStrategy< */ protected getWorkerNodeTaskRunTime(workerNodeKey: number): number { return this.taskStatisticsRequirements.runTime.median - ? (this.pool.workerNodes[workerNodeKey].usage.runTime.median ?? 0) - : (this.pool.workerNodes[workerNodeKey].usage.runTime.average ?? 0) + ? (this.pool.workerNodes[workerNodeKey]?.usage.runTime.median ?? 0) + : (this.pool.workerNodes[workerNodeKey]?.usage.runTime.average ?? 0) } /** @@ -152,8 +152,8 @@ export abstract class AbstractWorkerChoiceStrategy< */ protected getWorkerNodeTaskWaitTime(workerNodeKey: number): number { return this.taskStatisticsRequirements.waitTime.median - ? (this.pool.workerNodes[workerNodeKey].usage.waitTime.median ?? 0) - : (this.pool.workerNodes[workerNodeKey].usage.waitTime.average ?? 0) + ? (this.pool.workerNodes[workerNodeKey]?.usage.waitTime.median ?? 0) + : (this.pool.workerNodes[workerNodeKey]?.usage.waitTime.average ?? 0) } // /** @@ -166,8 +166,8 @@ export abstract class AbstractWorkerChoiceStrategy< // */ // protected getWorkerNodeTaskElu(workerNodeKey: number): number { // return this.taskStatisticsRequirements.elu.median - // ? this.pool.workerNodes[workerNodeKey].usage.elu.active.median ?? 0 - // : this.pool.workerNodes[workerNodeKey].usage.elu.active.average ?? 0 + // ? this.pool.workerNodes[workerNodeKey]?.usage.elu.active.median ?? 0 + // : this.pool.workerNodes[workerNodeKey]?.usage.elu.active.average ?? 0 // } /** diff --git a/src/pools/selection-strategies/selection-strategies-utils.ts b/src/pools/selection-strategies/selection-strategies-utils.ts index b8fdb7b15..019fb3809 100644 --- a/src/pools/selection-strategies/selection-strategies-utils.ts +++ b/src/pools/selection-strategies/selection-strategies-utils.ts @@ -54,8 +54,8 @@ const cpusCycleTimeWeight = (cpus: { speed: number }[]): number => { for (const cpu of cpus) { // CPU estimated cycle time const numberOfDigits = cpu.speed.toString().length - 1 - const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) - cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) + const cpuCycleTime = 1 / (cpu.speed / 10 ** numberOfDigits) + cpusCycleTimeWeight += cpuCycleTime * 10 ** numberOfDigits } return Math.round(cpusCycleTimeWeight / cpus.length) } diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 86da95e64..9ed56d33f 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -49,7 +49,7 @@ export const getDefaultTasksQueueOptions = ( poolMaxSize: number, ): Required> => { return Object.freeze({ - size: Math.pow(poolMaxSize, 2), + size: poolMaxSize ** 2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true, @@ -75,19 +75,23 @@ export const checkDynamicPoolSize = ( throw new TypeError( 'Cannot instantiate a dynamic pool without specifying the maximum pool size', ) - } else if (!Number.isSafeInteger(max)) { + } + if (!Number.isSafeInteger(max)) { throw new TypeError( 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size', ) - } else if (min > max) { + } + if (min > max) { throw new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size', ) - } else if (max === 0) { + } + if (max === 0) { throw new RangeError( 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero', ) - } else if (min === max) { + } + if (min === max) { throw new RangeError( 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead', ) @@ -442,6 +446,7 @@ export const initWorkerInfo = (worker: IWorker): WorkerInfo => { backPressure: false, backPressureStealing: false, continuousStealing: false, + queuedTaskAbortion: false, dynamic: false, id: getWorkerId(worker), ready: false, diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 442086dc1..a2daf3062 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -39,8 +39,9 @@ export class WorkerNode /** @inheritdoc */ public messageChannel?: MessageChannel /** @inheritdoc */ + public readonly tasksQueue: PriorityQueue> + /** @inheritdoc */ public tasksQueueBackPressureSize: number - private readonly tasksQueue: PriorityQueue> private setBackPressureFlag: boolean private readonly taskFunctionsUsage: Map @@ -143,6 +144,11 @@ export class WorkerNode this.tasksQueue.clear() } + /** @inheritdoc */ + public deleteTask(task: Task): boolean { + return this.tasksQueue.delete(task) + } + /** * Whether the worker node is back pressured or not. * diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 37b7f3e53..90de9f603 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -1,4 +1,5 @@ import type { CircularBuffer } from '../circular-buffer.ts' +import type { PriorityQueue } from '../queues/priority-queue.ts' import type { Task, TaskFunctionProperties } from '../utility-types.ts' /** @@ -138,6 +139,11 @@ export interface WorkerInfo { * Dynamic flag. */ dynamic: boolean + /** + * Queued task abortion flag. + * This flag is set to `true` when worker node is aborting a queued task. + */ + queuedTaskAbortion: boolean /** * Ready flag. */ @@ -297,6 +303,10 @@ export interface IWorkerNode * This is used to store data that are specific to the worker choice strategy. */ strategyData?: StrategyData + /** + * Tasks queue. + */ + readonly tasksQueue: PriorityQueue> /** * Message channel (worker thread only). */ @@ -341,6 +351,12 @@ export interface IWorkerNode * Clears tasks queue. */ readonly clearTasksQueue: () => void + /** + * Deletes a task from the tasks queue. + * @param task - The task to delete. + * @returns `true` if the task was deleted, `false` otherwise. + */ + readonly deleteTask: (task: Task) => boolean /** * Terminates the worker node. */ @@ -367,6 +383,7 @@ export interface IWorkerNode * @internal */ export interface WorkerNodeEventDetail { + taskId?: `${string}-${string}-${string}-${string}-${string}` workerId?: `${string}-${string}-${string}-${string}-${string}` workerNodeKey?: number } diff --git a/src/queues/abstract-fixed-queue.ts b/src/queues/abstract-fixed-queue.ts index 68b2111ac..3f9537da7 100644 --- a/src/queues/abstract-fixed-queue.ts +++ b/src/queues/abstract-fixed-queue.ts @@ -57,6 +57,18 @@ export abstract class AbstractFixedQueue implements IFixedQueue { return this.nodeArray[index].data } + /** @inheritdoc */ + public delete(data: T): boolean { + const index = this.nodeArray.findIndex((node) => node?.data === data) + if (index !== -1) { + this.nodeArray.splice(index, 1) + this.nodeArray.length = this.capacity + --this.size + return true + } + return false + } + /** @inheritdoc */ public dequeue(): T | undefined { if (this.empty()) { diff --git a/src/queues/priority-queue.ts b/src/queues/priority-queue.ts index 17326266b..41ac42ea2 100644 --- a/src/queues/priority-queue.ts +++ b/src/queues/priority-queue.ts @@ -133,6 +133,36 @@ export class PriorityQueue { return size } + /** + * Deletes the given data from the priority queue. + * @param data - Data to delete. + * @returns `true` if the data was deleted, `false` otherwise. + */ + public delete(data: T): boolean { + let node: PriorityQueueNode | undefined = this.tail + let prev: PriorityQueueNode | undefined + while (node != null) { + if (node.delete(data)) { + if (node.empty()) { + if (node === this.tail && node.next != null) { + this.tail = node.next + delete node.next + } else if (node.next != null && prev != null) { + prev.next = node.next + delete node.next + } else if (node.next == null && prev != null) { + delete prev.next + this.head = prev + } + } + return true + } + prev = node + node = node.next + } + return false + } + /** * Dequeue data from the priority queue. * @@ -165,7 +195,8 @@ export class PriorityQueue { node.next = tail!.next delete tail!.next break - } else if (node.next === tail && tail!.next == null) { + } + if (node.next === tail && tail!.next == null) { delete node.next this.head = node break diff --git a/src/queues/queue-types.ts b/src/queues/queue-types.ts index 83d7f7f29..2049bc476 100644 --- a/src/queues/queue-types.ts +++ b/src/queues/queue-types.ts @@ -59,6 +59,12 @@ export interface IFixedQueue { * Clears the fixed queue. */ clear: () => void + /** + * Deletes the given data from the fixed priority queue. + * @param data - Data to delete. + * @returns `true` if the data was deleted, `false` otherwise. + */ + delete: (data: T) => boolean /** * Returns an iterator for the fixed queue. * @returns An iterator for the fixed queue. diff --git a/src/utility-types.ts b/src/utility-types.ts index 2b4dc4d18..69f9c193f 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -7,6 +7,10 @@ import type { KillBehavior } from './worker/worker-options.ts' * @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data. */ export interface WorkerError { + /** + * Whether the error is an abort error or not. + */ + readonly aborted: boolean /** * Data triggering the error. */ @@ -95,6 +99,10 @@ export interface TaskFunctionProperties { * @internal */ export interface Task { + /** + * Whether the task is abortable or not. + */ + readonly abortable?: boolean /** * Task name. */ @@ -175,6 +183,11 @@ export interface MessageValue * Task function properties. */ readonly taskFunctionsProperties?: TaskFunctionProperties[] + /** + * Task operation: + * - `'abort'` - Abort a task. + */ + readonly taskOperation?: 'abort' /** * Whether the worker computes the given statistics or not. */ @@ -200,6 +213,10 @@ export interface MessageValue * @internal */ export interface PromiseResponseWrapper { + /** + * The task abort signal. + */ + readonly abortSignal?: AbortSignal /** * Resolve callback to fulfill the promise. */ diff --git a/src/utils.ts b/src/utils.ts index 3d4f7e106..a1c0c72ed 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -49,7 +49,7 @@ export const exponentialDelay = ( retryNumber = 0, delayFactor = 100, ): number => { - const delay = Math.pow(2, retryNumber) * delayFactor + const delay = 2 ** retryNumber * delayFactor const randomSum = delay * 0.2 * secureRandom() // 0-20% of the delay return delay + randomSum } @@ -64,7 +64,8 @@ export const exponentialDelay = ( export const average = (dataSet: number[]): number => { if (Array.isArray(dataSet) && dataSet.length === 0) { return 0 - } else if (Array.isArray(dataSet) && dataSet.length === 1) { + } + if (Array.isArray(dataSet) && dataSet.length === 1) { return dataSet[0] } return ( @@ -83,7 +84,8 @@ export const average = (dataSet: number[]): number => { export const median = (dataSet: number[]): number => { if (Array.isArray(dataSet) && dataSet.length === 0) { return 0 - } else if (Array.isArray(dataSet) && dataSet.length === 1) { + } + if (Array.isArray(dataSet) && dataSet.length === 1) { return dataSet[0] } const sortedDataSet = dataSet.slice().sort((a, b) => a - b) @@ -104,7 +106,7 @@ export const median = (dataSet: number[]): number => { * @internal */ export const round = (num: number, scale = 2): number => { - const rounder = Math.pow(10, scale) + const rounder = 10 ** scale return Math.round(num * rounder * (1 + Number.EPSILON)) / rounder } diff --git a/src/worker/abort-error.ts b/src/worker/abort-error.ts new file mode 100644 index 000000000..9806d3aa6 --- /dev/null +++ b/src/worker/abort-error.ts @@ -0,0 +1,6 @@ +export class AbortError extends Error { + public constructor(message: string) { + super(message) + this.name = 'AbortError' + } +} diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index 8877c7bbb..9452123c6 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -12,6 +12,7 @@ import { isAsyncFunction, isPlainObject, } from '../utils.ts' +import { AbortError } from './abort-error.ts' import type { TaskAsyncFunction, TaskFunction, @@ -73,6 +74,13 @@ export abstract class AbstractWorker< * Performance statistics computation requirements. */ protected statistics?: WorkerStatistics + /** + * Task abort functions processed by the worker when task operation 'abort' is received.Add commentMore actions + */ + protected taskAbortFunctions: Map< + `${string}-${string}-${string}-${string}-${string}`, + () => void + > /** * Handler id of the `activeInterval` worker activity check. */ @@ -95,6 +103,10 @@ export abstract class AbstractWorker< throw new Error('isMain parameter is mandatory') } this.checkTaskFunctions(taskFunctions) + this.taskAbortFunctions = new Map< + `${string}-${string}-${string}-${string}-${string}`, + () => void + >() this.checkWorkerOptions(this.opts) if (!this.isMain) { this.getMainWorker().addEventListener( @@ -330,10 +342,17 @@ export abstract class AbstractWorker< protected messageEventListener( messageEvent: MessageEvent>, ): void { - const { data } = messageEvent - this.checkMessageWorkerId(data) - const { statistics, checkActive, taskFunctionOperation, taskId, kill } = - data + const { data: messageData } = messageEvent + this.checkMessageWorkerId(messageData) + const { + data, + statistics, + checkActive, + taskFunctionOperation, + taskId, + taskOperation, + kill, + } = messageData if (statistics != null) { // Statistics message received this.statistics = statistics @@ -342,13 +361,18 @@ export abstract class AbstractWorker< checkActive ? this.startCheckActive() : this.stopCheckActive() } else if (taskFunctionOperation != null) { // Task function operation message received - this.handleTaskFunctionOperationMessage(data) + this.handleTaskFunctionOperationMessage(messageData) } else if (taskId != null && data != null) { // Task message received - this.run(data) + this.run(messageData) + } else if (taskOperation === 'abort' && taskId != null) { + // Abort task operation message received + if (this.taskAbortFunctions.has(taskId)) { + this.taskAbortFunctions.get(taskId)?.() + } } else if (kill === true) { // Kill message received - this.handleKillMessage(data) + this.handleKillMessage(messageData) } } @@ -403,6 +427,7 @@ export abstract class AbstractWorker< ...(!status && error != null && { workerError: { + aborted: error instanceof AbortError, error, name: taskFunctionProperties.name, }, @@ -444,13 +469,43 @@ export abstract class AbstractWorker< private checkMessageWorkerId(message: MessageValue): void { if (message.workerId == null) { throw new Error('Message worker id is not set') - } else if (message.workerId !== this.id) { + } + if (message.workerId !== this.id) { throw new Error( `Message worker id ${message.workerId.toString()} does not match the worker id ${this.id}`, ) } } + /** + * Gets abortable task function. + * An abortable promise is built to permit the task to be aborted. + * @param name - The name of the task. + * @param taskId - The task id. + * @returns The abortable task function. + */ + private getAbortableTaskFunction( + name: string, + taskId: `${string}-${string}-${string}-${string}-${string}`, + ): TaskAsyncFunction { + return async (data?: Data): Promise => + await new Promise( + (resolve, reject: (reason?: unknown) => void) => { + this.taskAbortFunctions.set(taskId, () => { + reject(new AbortError(`Task '${name}' id '${taskId}' aborted`)) + }) + const taskFunction = this.taskFunctions.get(name)?.taskFunction + if (isAsyncFunction(taskFunction)) { + ;(taskFunction as TaskAsyncFunction)(data) + .then(resolve) + .catch(reject) + } else { + resolve((taskFunction as TaskSyncFunction)(data)) + } + }, + ) + } + /** * Starts the worker check active interval. */ @@ -521,11 +576,12 @@ export abstract class AbstractWorker< * @param task - The task to execute. */ protected readonly run = (task: Task): void => { - const { name, taskId, data } = task + const { abortable, name, taskId, data } = task const taskFunctionName = name ?? DEFAULT_TASK_NAME if (!this.taskFunctions.has(taskFunctionName)) { this.sendToMainWorker({ workerError: { + aborted: false, data, error: new Error(`Task function '${name!}' not found`), name, @@ -534,7 +590,12 @@ export abstract class AbstractWorker< }) return } - const fn = this.taskFunctions.get(taskFunctionName)?.taskFunction + let fn: TaskFunction + if (abortable === true) { + fn = this.getAbortableTaskFunction(taskFunctionName, taskId!) + } else { + fn = this.taskFunctions.get(taskFunctionName)!.taskFunction + } if (isAsyncFunction(fn)) { this.runAsync(fn as TaskAsyncFunction, task) } else { @@ -552,7 +613,7 @@ export abstract class AbstractWorker< fn: TaskSyncFunction, task: Task, ): void => { - const { name, taskId, data } = task + const { abortable, name, taskId, data } = task try { let taskPerformance = this.beginTaskPerformance(name) const res = fn(data) @@ -565,6 +626,7 @@ export abstract class AbstractWorker< } catch (error) { this.sendToMainWorker({ workerError: { + aborted: error instanceof AbortError, data, error: error as Error, name, @@ -573,6 +635,9 @@ export abstract class AbstractWorker< }) } finally { this.updateLastTaskTimestamp() + if (abortable === true) { + this.taskAbortFunctions.delete(taskId!) + } } } @@ -586,7 +651,7 @@ export abstract class AbstractWorker< fn: TaskAsyncFunction, task: Task, ): void => { - const { name, taskId, data } = task + const { abortable, name, taskId, data } = task let taskPerformance = this.beginTaskPerformance(name) fn(data) .then((res) => { @@ -600,6 +665,7 @@ export abstract class AbstractWorker< .catch((error) => { this.sendToMainWorker({ workerError: { + aborted: error instanceof AbortError, data, error: error as Error, name, @@ -609,6 +675,9 @@ export abstract class AbstractWorker< }) .finally(() => { this.updateLastTaskTimestamp() + if (abortable === true) { + this.taskAbortFunctions.delete(taskId!) + } }) .catch(EMPTY_FUNCTION) } diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 4d4988b01..94cfd4cc2 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -262,7 +262,7 @@ describe({ enableTasksQueue: true, tasksQueueOptions: { concurrency: 2, - size: Math.pow(numberOfWorkers, 2), + size: numberOfWorkers ** 2, taskStealing: true, tasksStealingOnBackPressure: true, tasksStealingRatio: 0.6, @@ -627,7 +627,7 @@ describe({ expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, - size: Math.pow(numberOfWorkers, 2), + size: numberOfWorkers ** 2, taskStealing: true, tasksStealingOnBackPressure: true, tasksStealingRatio: 0.6, @@ -637,7 +637,7 @@ describe({ expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, - size: Math.pow(numberOfWorkers, 2), + size: numberOfWorkers ** 2, taskStealing: true, tasksStealingOnBackPressure: true, tasksStealingRatio: 0.6, @@ -657,7 +657,7 @@ describe({ ) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, - size: Math.pow(numberOfWorkers, 2), + size: numberOfWorkers ** 2, taskStealing: true, tasksStealingOnBackPressure: true, tasksStealingRatio: 0.6, @@ -888,6 +888,7 @@ describe({ continuousStealing: false, backPressureStealing: false, backPressure: false, + queuedTaskAbortion: false, }) } await pool.destroy() @@ -908,6 +909,7 @@ describe({ continuousStealing: false, backPressureStealing: false, backPressure: false, + queuedTaskAbortion: false, }) } await pool.destroy() @@ -971,8 +973,11 @@ describe({ new TypeError('name argument must not be an empty string'), ) await expect(pool.execute(undefined, undefined, {})).rejects.toThrow( - new TypeError('transferList argument must be an array'), + new TypeError('abortSignal argument must be an AbortSignal'), ) + await expect( + pool.execute(undefined, undefined, new AbortController().signal, {}), + ).rejects.toThrow(new TypeError('transferList argument must be an array')) await expect(pool.execute(undefined, 'unknown')).rejects.toThrow( new Error("Task function 'unknown' not found"), ) @@ -1909,13 +1914,40 @@ describe({ await expect(pool.mapExecute([undefined], '')).rejects.toThrow( new TypeError('name argument must not be an empty string'), ) - await expect(pool.mapExecute([undefined], undefined, {})).rejects.toThrow( - new TypeError('transferList argument must be an array'), + await expect(pool.mapExecute([undefined], undefined, 0)).rejects.toThrow( + new TypeError('abortSignals argument must be an iterable'), + ) + await expect( + pool.mapExecute([undefined], undefined, [undefined]), + ).rejects.toThrow( + new TypeError( + 'abortSignals argument must be an iterable of AbortSignal', + ), + ) + await expect( + pool.mapExecute([undefined], undefined, [ + new AbortController().signal, + new AbortController().signal, + ]), + ).rejects.toThrow( + new Error('data and abortSignals arguments must have the same length'), ) + await expect( + pool.mapExecute( + [undefined], + undefined, + [new AbortController().signal], + {}, + ), + ).rejects.toThrow(new TypeError('transferList argument must be an array')) await expect(pool.mapExecute([undefined], 'unknown')).rejects.toThrow( new Error("Task function 'unknown' not found"), ) - let results = await pool.mapExecute([{}, {}, {}, {}]) + let results = await pool.mapExecute( + Array(4).fill({}), + 'jsonIntegerSerialization', + Array(4).fill(AbortSignal.timeout(1000)), + ) expect(results).toStrictEqual([ { ok: 1 }, { ok: 1 }, @@ -1936,6 +1968,7 @@ describe({ }, ], 'factorial', + Array(4).fill(AbortSignal.timeout(1000)), ) expect(results).toStrictEqual([ 3628800n, @@ -1948,6 +1981,12 @@ describe({ results = await pool.mapExecute( new Set([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }]), 'factorial', + new Set([ + AbortSignal.timeout(1000), + AbortSignal.timeout(1500), + AbortSignal.timeout(2000), + AbortSignal.timeout(2500), + ]), ) expect(results).toStrictEqual([ 3628800n, diff --git a/tests/pools/thread/dynamic.test.mjs b/tests/pools/thread/dynamic.test.mjs index 2edf62beb..fb3df3679 100644 --- a/tests/pools/thread/dynamic.test.mjs +++ b/tests/pools/thread/dynamic.test.mjs @@ -33,13 +33,21 @@ describe({ }) it('Verify that the function is executed in a worker thread', async () => { - let result = await pool.execute({ - function: TaskFunctions.fibonacci, - }) + let result = await pool.execute( + { + function: TaskFunctions.fibonacci, + }, + 'default', + AbortSignal.timeout(2000), + ) expect(result).toBe(354224848179261915075n) - result = await pool.execute({ - function: TaskFunctions.factorial, - }) + result = await pool.execute( + { + function: TaskFunctions.factorial, + }, + 'default', + AbortSignal.timeout(2000), + ) expect(result).toBe( 93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000n, ) diff --git a/tests/pools/thread/fixed.test.mjs b/tests/pools/thread/fixed.test.mjs index c5e6625a0..5e0aac658 100644 --- a/tests/pools/thread/fixed.test.mjs +++ b/tests/pools/thread/fixed.test.mjs @@ -81,13 +81,21 @@ describe({ }) it('Verify that the function is executed in a worker thread', async () => { - let result = await pool.execute({ - function: TaskFunctions.fibonacci, - }) + let result = await pool.execute( + { + function: TaskFunctions.fibonacci, + }, + 'default', + AbortSignal.timeout(2000), + ) expect(result).toBe(354224848179261915075n) - result = await pool.execute({ - function: TaskFunctions.factorial, - }) + result = await pool.execute( + { + function: TaskFunctions.factorial, + }, + 'default', + AbortSignal.timeout(2000), + ) expect(result).toBe( 93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000n, ) @@ -179,7 +187,7 @@ describe({ let error let result try { - result = await pool.execute(undefined, undefined, [ + result = await pool.execute(undefined, undefined, undefined, [ new ArrayBuffer(16), new MessageChannel().port1, ]) @@ -189,7 +197,7 @@ describe({ expect(result).toStrictEqual({ ok: 1 }) expect(error).toBeUndefined() try { - result = await pool.execute(undefined, undefined, [ + result = await pool.execute(undefined, undefined, undefined, [ new SharedArrayBuffer(16), ]) } catch (e) { @@ -214,6 +222,7 @@ describe({ expect(inError).toBeInstanceOf(Error) expect(inError.message).toStrictEqual('Error Message from ThreadWorker') expect(taskError).toStrictEqual({ + aborted: false, data, error: inError, name: DEFAULT_TASK_NAME, @@ -245,6 +254,7 @@ describe({ 'Error Message from ThreadWorker:async', ) expect(taskError).toStrictEqual({ + aborted: false, data, error: inError, name: DEFAULT_TASK_NAME, @@ -256,6 +266,33 @@ describe({ ).toBe(true) }) + it('Verify that task can be aborted', async () => { + let error + + try { + await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500)) + } catch (e) { + error = e + } + expect(error).toBeInstanceOf(Error) + expect(error.name).toBe('TimeoutError') + expect(error.message).toBe('Signal timed out.') + expect(error.stack).toBeDefined() + + const abortController = new AbortController() + setTimeout(() => { + abortController.abort(new Error('Task aborted')) + }, 500) + try { + await asyncErrorPool.execute({}, 'default', abortController.signal) + } catch (e) { + error = e + } + expect(error).toBeInstanceOf(Error) + expect(error.message).toBe('Task aborted') + expect(error.stack).toBeDefined() + }) + it('Verify that async function is working properly', async () => { const data = { f: 10 } const startTime = performance.now() diff --git a/tests/pools/utils.test.mjs b/tests/pools/utils.test.mjs index 8449ceab6..9ab5bd5fa 100644 --- a/tests/pools/utils.test.mjs +++ b/tests/pools/utils.test.mjs @@ -24,7 +24,7 @@ describe('Pool utils test suite', () => { const poolMaxSize = 4 expect(getDefaultTasksQueueOptions(poolMaxSize)).toStrictEqual({ concurrency: 1, - size: Math.pow(poolMaxSize, 2), + size: poolMaxSize ** 2, taskStealing: true, tasksStealingOnBackPressure: true, tasksStealingRatio: 0.6, @@ -123,6 +123,7 @@ describe('Pool utils test suite', () => { backPressure: false, backPressureStealing: false, continuousStealing: false, + queuedTaskAbortion: false, dynamic: false, id: expect.any(String), ready: false, diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index baf4477ba..8b73f9fdc 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -2,8 +2,8 @@ import { expect } from '@std/expect' import { after, before, describe, it } from '@std/testing/bdd' import { CircularBuffer } from '../../src/circular-buffer.ts' import { WorkerTypes } from '../../src/mod.ts' -import { WorkerNode } from '../../src/pools/worker-node.ts' import { MeasurementHistorySize } from '../../src/pools/worker.ts' +import { WorkerNode } from '../../src/pools/worker-node.ts' import { PriorityQueue } from '../../src/queues/priority-queue.ts' import { DEFAULT_TASK_NAME } from '../../src/utils.ts' @@ -238,6 +238,7 @@ describe('Worker node test suite', () => { continuousStealing: false, backPressureStealing: false, backPressure: false, + queuedTaskAbortion: false, }) expect(threadWorkerNode.usage).toStrictEqual({ tasks: { diff --git a/tests/queues/fixed-priority-queue.test.mjs b/tests/queues/fixed-priority-queue.test.mjs index 53f60fa41..3b27ce09b 100644 --- a/tests/queues/fixed-priority-queue.test.mjs +++ b/tests/queues/fixed-priority-queue.test.mjs @@ -139,6 +139,41 @@ describe('Fixed priority queue test suite', () => { expect(fixedPriorityQueue.capacity).toBe(queueSize) }) + it('Verify delete() behavior', () => { + const fixedPriorityQueue = new FixedPriorityQueue() + fixedPriorityQueue.enqueue(1) + fixedPriorityQueue.enqueue(2, -1) + fixedPriorityQueue.enqueue(3) + expect(fixedPriorityQueue.start).toBe(0) + expect(fixedPriorityQueue.size).toBe(3) + expect(fixedPriorityQueue.nodeArray).toMatchObject([ + { data: 2, priority: -1 }, + { data: 1, priority: 0 }, + { data: 3, priority: 0 }, + ]) + expect(fixedPriorityQueue.delete(2)).toBe(true) + expect(fixedPriorityQueue.start).toBe(0) + expect(fixedPriorityQueue.size).toBe(2) + expect(fixedPriorityQueue.nodeArray).toMatchObject([ + { data: 1, priority: 0 }, + { data: 3, priority: 0 }, + ]) + expect(fixedPriorityQueue.delete(3)).toBe(true) + expect(fixedPriorityQueue.start).toBe(0) + expect(fixedPriorityQueue.size).toBe(1) + expect(fixedPriorityQueue.nodeArray).toMatchObject([ + { data: 1, priority: 0 }, + ]) + expect(fixedPriorityQueue.delete(1)).toBe(true) + expect(fixedPriorityQueue.start).toBe(0) + expect(fixedPriorityQueue.size).toBe(0) + expect(fixedPriorityQueue.nodeArray).toMatchObject([]) + expect(fixedPriorityQueue.delete(2)).toBe(false) + expect(fixedPriorityQueue.start).toBe(0) + expect(fixedPriorityQueue.size).toBe(0) + expect(fixedPriorityQueue.nodeArray).toMatchObject([]) + }) + it('Verify iterator behavior', () => { const fixedPriorityQueue = new FixedPriorityQueue() fixedPriorityQueue.enqueue(1) diff --git a/tests/queues/fixed-queue.test.mjs b/tests/queues/fixed-queue.test.mjs index f847fa3a6..cab0a6ce1 100644 --- a/tests/queues/fixed-queue.test.mjs +++ b/tests/queues/fixed-queue.test.mjs @@ -137,6 +137,39 @@ describe('Fixed queue test suite', () => { expect(fixedQueue.capacity).toBe(queueSize) }) + it('Verify delete() behavior', () => { + const fixedQueue = new FixedQueue() + fixedQueue.enqueue(1) + fixedQueue.enqueue(2, -1) + fixedQueue.enqueue(3) + expect(fixedQueue.start).toBe(0) + expect(fixedQueue.size).toBe(3) + expect(fixedQueue.nodeArray).toMatchObject([ + { data: 1, priority: 0 }, + { data: 2, priority: -1 }, + { data: 3, priority: 0 }, + ]) + expect(fixedQueue.delete(2)).toBe(true) + expect(fixedQueue.start).toBe(0) + expect(fixedQueue.size).toBe(2) + expect(fixedQueue.nodeArray).toMatchObject([ + { data: 1, priority: 0 }, + { data: 3, priority: 0 }, + ]) + expect(fixedQueue.delete(3)).toBe(true) + expect(fixedQueue.start).toBe(0) + expect(fixedQueue.size).toBe(1) + expect(fixedQueue.nodeArray).toMatchObject([{ data: 1, priority: 0 }]) + expect(fixedQueue.delete(1)).toBe(true) + expect(fixedQueue.start).toBe(0) + expect(fixedQueue.size).toBe(0) + expect(fixedQueue.nodeArray).toMatchObject([]) + expect(fixedQueue.delete(2)).toBe(false) + expect(fixedQueue.start).toBe(0) + expect(fixedQueue.size).toBe(0) + expect(fixedQueue.nodeArray).toMatchObject([]) + }) + it('Verify iterator behavior', () => { const fixedQueue = new FixedQueue() fixedQueue.enqueue(1) diff --git a/tests/queues/priority-queue.test.mjs b/tests/queues/priority-queue.test.mjs index 62544b1d7..556c06740 100644 --- a/tests/queues/priority-queue.test.mjs +++ b/tests/queues/priority-queue.test.mjs @@ -301,6 +301,112 @@ describe('Priority queue test suite', () => { expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) }) + it('Verify default bucket size delete() behavior', () => { + const priorityQueue = new PriorityQueue(defaultBucketSize, true) + priorityQueue.enqueue(1) + priorityQueue.enqueue(2) + priorityQueue.enqueue(3) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(3) + expect(priorityQueue.maxSize).toBe(3) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(2)).toBe(true) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(2) + expect(priorityQueue.maxSize).toBe(3) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(3)).toBe(true) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(1) + expect(priorityQueue.maxSize).toBe(3) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(1)).toBe(true) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(0) + expect(priorityQueue.maxSize).toBe(3) + expect(priorityQueue.tail.empty()).toBe(true) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(2)).toBe(false) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(0) + expect(priorityQueue.maxSize).toBe(3) + expect(priorityQueue.tail.empty()).toBe(true) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + }) + + it('Verify bucketSize=2 delete() behavior', () => { + const priorityQueue = new PriorityQueue(2, true) + priorityQueue.enqueue(1) + priorityQueue.enqueue(2) + priorityQueue.enqueue(3) + priorityQueue.enqueue(3, -1) + priorityQueue.enqueue(1, 1) + priorityQueue.enqueue(3, -2) + expect(priorityQueue.buckets).toBe(3) + expect(priorityQueue.size).toBe(6) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue) + expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(2)).toBe(true) + expect(priorityQueue.buckets).toBe(2) + expect(priorityQueue.size).toBe(5) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue) + expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(3)).toBe(true) + expect(priorityQueue.buckets).toBe(2) + expect(priorityQueue.size).toBe(4) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue) + expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(1)).toBe(true) + expect(priorityQueue.buckets).toBe(1) + expect(priorityQueue.size).toBe(3) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue) + expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(1)).toBe(true) + expect(priorityQueue.buckets).toBe(1) + expect(priorityQueue.size).toBe(2) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue) + expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(3)).toBe(true) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(1) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(2)).toBe(false) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(1) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(false) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + expect(priorityQueue.delete(3)).toBe(true) + expect(priorityQueue.buckets).toBe(0) + expect(priorityQueue.size).toBe(0) + expect(priorityQueue.maxSize).toBe(6) + expect(priorityQueue.tail.empty()).toBe(true) + expect(priorityQueue.tail.next).toBe(undefined) + expect(priorityQueue.tail).toStrictEqual(priorityQueue.head) + }) + it('Verify enablePriority setter behavior', () => { const priorityQueue = new PriorityQueue(2) expect(priorityQueue.enablePriority).toBe(false) diff --git a/tests/test-utils.mjs b/tests/test-utils.mjs index 51d81634a..18901e2af 100644 --- a/tests/test-utils.mjs +++ b/tests/test-utils.mjs @@ -104,14 +104,13 @@ export const fibonacci = (n) => { export const factorial = (n) => { if (n === 0 || n === 1) { return 1n - } else { - n = BigInt(n) - let factorial = 1n - for (let i = 1n; i <= n; i++) { - factorial *= i - } - return factorial } + n = BigInt(n) + let factorial = 1n + for (let i = 1n; i <= n; i++) { + factorial *= i + } + return factorial } export const executeTaskFunction = (data) => { diff --git a/tests/worker/abort-error.test.mjs b/tests/worker/abort-error.test.mjs new file mode 100644 index 000000000..67b39e995 --- /dev/null +++ b/tests/worker/abort-error.test.mjs @@ -0,0 +1,17 @@ +import { expect } from '@std/expect' +import { describe, it } from '@std/testing/bdd' + +import { AbortError } from '../../src/worker/abort-error.ts' + +describe('Abort error test suite', () => { + it('Verify constructor() behavior', () => { + const errorMessage = 'This is an abort error message' + const abortError = new AbortError(errorMessage) + + expect(abortError).toBeInstanceOf(AbortError) + expect(abortError).toBeInstanceOf(Error) + expect(abortError.name).toBe('AbortError') + expect(abortError.message).toBe(errorMessage) + expect(abortError.stack).toBeDefined() + }) +})