Skip to content

Commit e289c1d

Browse files
committed
fix: properly handle task abortion worker side
Signed-off-by: Jérôme Benoit <[email protected]>
1 parent 0b1a2d9 commit e289c1d

File tree

5 files changed

+51
-24
lines changed

5 files changed

+51
-24
lines changed

src/worker/abstract-worker.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -342,16 +342,17 @@ export abstract class AbstractWorker<
342342
protected messageEventListener(
343343
messageEvent: MessageEvent<MessageValue<Data>>,
344344
): void {
345-
const { data } = messageEvent
346-
this.checkMessageWorkerId(data)
345+
const { data: messageData } = messageEvent
346+
this.checkMessageWorkerId(messageData)
347347
const {
348+
data,
348349
statistics,
349350
checkActive,
350351
taskFunctionOperation,
351352
taskId,
352353
taskOperation,
353354
kill,
354-
} = data
355+
} = messageData
355356
if (statistics != null) {
356357
// Statistics message received
357358
this.statistics = statistics
@@ -360,18 +361,18 @@ export abstract class AbstractWorker<
360361
checkActive ? this.startCheckActive() : this.stopCheckActive()
361362
} else if (taskFunctionOperation != null) {
362363
// Task function operation message received
363-
this.handleTaskFunctionOperationMessage(data)
364+
this.handleTaskFunctionOperationMessage(messageData)
364365
} else if (taskId != null && data != null) {
365366
// Task message received
366-
this.run(data)
367+
this.run(messageData)
367368
} else if (taskOperation === 'abort' && taskId != null) {
368369
// Abort task operation message received
369370
if (this.taskAbortFunctions.has(taskId)) {
370371
this.taskAbortFunctions.get(taskId)?.()
371372
}
372373
} else if (kill === true) {
373374
// Kill message received
374-
this.handleKillMessage(data)
375+
this.handleKillMessage(messageData)
375376
}
376377
}
377378

tests/pools/abstract-pool.test.mjs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1943,7 +1943,11 @@ describe({
19431943
await expect(pool.mapExecute([undefined], 'unknown')).rejects.toThrow(
19441944
new Error("Task function 'unknown' not found"),
19451945
)
1946-
let results = await pool.mapExecute([{}, {}, {}, {}])
1946+
let results = await pool.mapExecute(
1947+
Array(4).fill({}),
1948+
'jsonIntegerSerialization',
1949+
Array(4).fill(AbortSignal.timeout(1000)),
1950+
)
19471951
expect(results).toStrictEqual([
19481952
{ ok: 1 },
19491953
{ ok: 1 },
@@ -1964,6 +1968,7 @@ describe({
19641968
},
19651969
],
19661970
'factorial',
1971+
Array(4).fill(AbortSignal.timeout(1000)),
19671972
)
19681973
expect(results).toStrictEqual([
19691974
3628800n,
@@ -1976,6 +1981,12 @@ describe({
19761981
results = await pool.mapExecute(
19771982
new Set([{ n: 10 }, { n: 20 }, { n: 30 }, { n: 40 }]),
19781983
'factorial',
1984+
new Set([
1985+
AbortSignal.timeout(1000),
1986+
AbortSignal.timeout(1500),
1987+
AbortSignal.timeout(2000),
1988+
AbortSignal.timeout(2500),
1989+
]),
19791990
)
19801991
expect(results).toStrictEqual([
19811992
3628800n,

tests/pools/thread/dynamic.test.mjs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,21 @@ describe({
3333
})
3434

3535
it('Verify that the function is executed in a worker thread', async () => {
36-
let result = await pool.execute({
37-
function: TaskFunctions.fibonacci,
38-
})
36+
let result = await pool.execute(
37+
{
38+
function: TaskFunctions.fibonacci,
39+
},
40+
'default',
41+
AbortSignal.timeout(2000),
42+
)
3943
expect(result).toBe(354224848179261915075n)
40-
result = await pool.execute({
41-
function: TaskFunctions.factorial,
42-
})
44+
result = await pool.execute(
45+
{
46+
function: TaskFunctions.factorial,
47+
},
48+
'default',
49+
AbortSignal.timeout(2000),
50+
)
4351
expect(result).toBe(
4452
93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000n,
4553
)

tests/pools/thread/fixed.test.mjs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,21 @@ describe({
8181
})
8282

8383
it('Verify that the function is executed in a worker thread', async () => {
84-
let result = await pool.execute({
85-
function: TaskFunctions.fibonacci,
86-
})
84+
let result = await pool.execute(
85+
{
86+
function: TaskFunctions.fibonacci,
87+
},
88+
'default',
89+
AbortSignal.timeout(2000),
90+
)
8791
expect(result).toBe(354224848179261915075n)
88-
result = await pool.execute({
89-
function: TaskFunctions.factorial,
90-
})
92+
result = await pool.execute(
93+
{
94+
function: TaskFunctions.factorial,
95+
},
96+
'default',
97+
AbortSignal.timeout(2000),
98+
)
9199
expect(result).toBe(
92100
93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000n,
93101
)
@@ -258,18 +266,17 @@ describe({
258266
).toBe(true)
259267
})
260268

261-
it.only('Verify that task can be aborted', async () => {
269+
it('Verify that task can be aborted', async () => {
262270
let error
263271

264272
try {
265273
await asyncErrorPool.execute({}, 'default', AbortSignal.timeout(500))
266274
} catch (e) {
267275
error = e
268276
}
269-
console.info(error)
270277
expect(error).toBeInstanceOf(Error)
271278
expect(error.name).toBe('TimeoutError')
272-
expect(error.message).toBe('The operation was aborted due to timeout')
279+
expect(error.message).toBe('Signal timed out.')
273280
expect(error.stack).toBeDefined()
274281

275282
const abortController = new AbortController()

tests/utils.test.mjs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,10 @@ describe('Utils test suite', () => {
168168
expect(isAsyncFunction({})).toBe(false)
169169
expect(isAsyncFunction({ a: 1 })).toBe(false)
170170
expect(isAsyncFunction(() => {})).toBe(false)
171-
expect(isAsyncFunction(() => {})).toBe(false)
171+
expect(isAsyncFunction(function () {})).toBe(false)
172172
expect(isAsyncFunction(function named() {})).toBe(false)
173173
expect(isAsyncFunction(async () => {})).toBe(true)
174-
expect(isAsyncFunction(async () => {})).toBe(true)
174+
expect(isAsyncFunction(async function () {})).toBe(true)
175175
expect(isAsyncFunction(async function named() {})).toBe(true)
176176
class TestClass {
177177
testSync() {}

0 commit comments

Comments
 (0)