Skip to content

Commit f45dc5d

Browse files
authored
fix: ensure abort listeners are removed from queue jobs (#2482)
Fixes a bug where aborted queue jobs were not removing abort signal listeners properly. Also removes the aborted job from the queue immediately to reduce size and allow garbage collection sooner.
1 parent ebb8db8 commit f45dc5d

File tree

5 files changed

+142
-2
lines changed

5 files changed

+142
-2
lines changed

packages/utils/src/queue/index.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,16 @@ export class Queue<JobReturnType = unknown, JobOptions extends QueueAddOptions =
243243
return result
244244
})
245245
.catch(err => {
246+
if (job.status === 'queued') {
247+
// job was aborted before it started - remove the job from the queue
248+
for (let i = 0; i < this.queue.length; i++) {
249+
if (this.queue[i] === job) {
250+
this.queue.splice(i, 1)
251+
break
252+
}
253+
}
254+
}
255+
246256
this.safeDispatchEvent('error', { detail: err })
247257
this.safeDispatchEvent('failure', { detail: { job, error: err } })
248258

packages/utils/src/queue/job.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ export class Job <JobOptions extends AbortOptions = AbortOptions, JobReturnType
5656
// if all recipients have aborted the job, actually abort the job
5757
if (allAborted) {
5858
this.controller.abort(new AbortError())
59+
this.cleanup()
5960
}
6061
}
6162

@@ -99,6 +100,7 @@ export class Job <JobOptions extends AbortOptions = AbortOptions, JobReturnType
99100

100101
cleanup (): void {
101102
this.recipients.forEach(recipient => {
103+
recipient.cleanup()
102104
recipient.signal?.removeEventListener('abort', this.onAbort)
103105
})
104106
}

packages/utils/src/queue/recipient.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export class JobRecipient<JobReturnType> {
1717
}
1818

1919
onAbort (): void {
20-
this.deferred.reject(new AbortError())
20+
this.deferred.reject(this.signal?.reason ?? new AbortError())
2121
}
2222

2323
cleanup (): void {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { TypedEventEmitter } from '@libp2p/interface'
2+
3+
export interface TestSignalEvents {
4+
abort: CustomEvent
5+
}
6+
7+
export class TestSignal extends TypedEventEmitter<TestSignalEvents> {
8+
public aborted: boolean
9+
public reason: any
10+
11+
constructor () {
12+
super()
13+
14+
this.aborted = false
15+
}
16+
17+
throwIfAborted (): void {
18+
19+
}
20+
21+
onabort (): void {
22+
23+
}
24+
25+
abort (reason?: any): void {
26+
this.aborted = true
27+
this.reason = reason
28+
this.safeDispatchEvent('abort')
29+
}
30+
}

packages/utils/test/queue.spec.ts

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import { expect } from 'aegir/chai'
22
import delay from 'delay'
33
import all from 'it-all'
44
import pDefer from 'p-defer'
5-
import { Queue } from '../src/queue/index.js'
5+
import { Queue, type QueueAddOptions } from '../src/queue/index.js'
6+
import { TestSignal } from './fixtures/test-signal.js'
67

78
const fixture = Symbol('fixture')
89

@@ -12,6 +13,10 @@ function randomInt (minimum: number, maximum: number): number {
1213
)
1314
}
1415

16+
interface SlowJobQueueOptions extends QueueAddOptions {
17+
slow: boolean
18+
}
19+
1520
describe('queue', () => {
1621
it('adds', async () => {
1722
const queue = new Queue<symbol>({})
@@ -716,4 +721,97 @@ describe('queue', () => {
716721
expect(collected).to.deep.equal([0, 1])
717722
expect(queue.size).to.equal(0)
718723
})
724+
725+
it('cleans up listeners after all job recipients abort', async () => {
726+
const queue = new Queue<void, SlowJobQueueOptions>({ concurrency: 1 })
727+
void queue.add(async () => {
728+
await delay(100)
729+
}, {
730+
slow: true
731+
})
732+
733+
const signal = new TestSignal()
734+
735+
const jobResult = queue.add(async () => {}, {
736+
slow: false,
737+
signal
738+
})
739+
740+
expect(queue.size).to.equal(2)
741+
expect(queue.queued).to.equal(1)
742+
expect(queue.running).to.equal(1)
743+
744+
const slowJob = queue.queue.find(job => !job.options.slow)
745+
746+
expect(slowJob?.recipients).to.have.lengthOf(1)
747+
expect(slowJob?.recipients[0].signal).to.equal(signal)
748+
749+
// listeners added
750+
expect(signal.listenerCount('abort')).to.equal(2)
751+
752+
// abort job stuck in queue
753+
signal.abort()
754+
755+
// all listeners removed
756+
expect(signal.listenerCount('abort')).to.equal(0)
757+
758+
await expect(jobResult).to.eventually.be.rejected
759+
.with.property('code', 'ABORT_ERR')
760+
})
761+
762+
it('rejects aborted jobs with the abort reason if supplied', async () => {
763+
const queue = new Queue<void, SlowJobQueueOptions>({ concurrency: 1 })
764+
void queue.add(async () => {
765+
await delay(100)
766+
}, {
767+
slow: true
768+
})
769+
770+
const signal = new TestSignal()
771+
772+
const jobResult = queue.add(async () => {}, {
773+
slow: false,
774+
signal
775+
})
776+
777+
const err = new Error('Took too long')
778+
779+
// abort job stuck in queue
780+
signal.abort(err)
781+
782+
// result rejects
783+
await expect(jobResult).to.eventually.be.rejectedWith(err)
784+
})
785+
786+
it('immediately removes aborted job', async () => {
787+
const signal = new TestSignal()
788+
const queue = new Queue<void, SlowJobQueueOptions>({ concurrency: 1 })
789+
void queue.add(async () => {
790+
await delay(100)
791+
}, {
792+
slow: true
793+
})
794+
const jobResult = queue.add(async () => {}, {
795+
slow: false,
796+
signal
797+
})
798+
799+
expect(queue.size).to.equal(2)
800+
expect(queue.queued).to.equal(1)
801+
expect(queue.running).to.equal(1)
802+
803+
// abort job stuck in queue
804+
signal.abort()
805+
806+
await expect(jobResult).to.eventually.be.rejected
807+
.with.property('code', 'ABORT_ERR')
808+
809+
// counts updated
810+
expect(queue.size).to.equal(1)
811+
expect(queue.queued).to.equal(0)
812+
expect(queue.running).to.equal(1)
813+
814+
// job not in queue any more
815+
expect(queue.queue.find(job => !job.options.slow)).to.be.undefined()
816+
})
719817
})

0 commit comments

Comments
 (0)