Skip to content

Commit b17824a

Browse files
authored
feat: add queue success and failure events (#2481)
Adds extra events to the Queue class that allow access to more context during the event, for example being able to access the options that were passed to a job.
1 parent 2c56203 commit b17824a

File tree

2 files changed

+99
-3
lines changed

2 files changed

+99
-3
lines changed

packages/utils/src/queue/index.ts

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,63 @@ export interface JobMatcher<JobOptions extends QueueAddOptions = QueueAddOptions
4444
(options?: Partial<JobOptions>): boolean
4545
}
4646

47-
export interface QueueEvents<JobReturnType> {
47+
export interface QueueJobSuccess<JobReturnType, JobOptions extends QueueAddOptions = QueueAddOptions> {
48+
job: Job<JobOptions, JobReturnType>
49+
result: JobReturnType
50+
}
51+
52+
export interface QueueJobFailure<JobReturnType, JobOptions extends QueueAddOptions = QueueAddOptions> {
53+
job: Job<JobOptions, JobReturnType>
54+
error: Error
55+
}
56+
57+
export interface QueueEvents<JobReturnType, JobOptions extends QueueAddOptions = QueueAddOptions> {
58+
/**
59+
* A job is about to start running
60+
*/
4861
'active': CustomEvent
62+
63+
/**
64+
* All jobs have finished and the queue is empty
65+
*/
4966
'idle': CustomEvent
67+
68+
/**
69+
* The queue is empty, jobs may be running
70+
*/
5071
'empty': CustomEvent
72+
73+
/**
74+
* A job was added to the queue
75+
*/
5176
'add': CustomEvent
77+
78+
/**
79+
* A job has finished or failed
80+
*/
5281
'next': CustomEvent
82+
83+
/**
84+
* A job has finished successfully
85+
*/
5386
'completed': CustomEvent<JobReturnType>
87+
88+
/**
89+
* A job has failed
90+
*/
5491
'error': CustomEvent<Error>
92+
93+
/**
94+
* Emitted just after `"completed", a job has finished successfully - this
95+
* event gives access to the job and it's result
96+
*/
97+
'success': CustomEvent<QueueJobSuccess<JobReturnType, JobOptions>>
98+
99+
/**
100+
* Emitted just after `"error", a job has failed - this event gives access to
101+
* the job and the thrown error
102+
*/
103+
'failure': CustomEvent<QueueJobFailure<JobReturnType, JobOptions>>
55104
}
56105

57106
// Port of lower_bound from https://en.cppreference.com/w/cpp/algorithm/lower_bound
@@ -81,7 +130,7 @@ function lowerBound<T> (array: readonly T[], value: T, comparator: (a: T, b: T)
81130
* 1. Items remain at the head of the queue while they are running so `queue.size` includes `queue.pending` items - this is so interested parties can join the results of a queue item while it is running
82131
* 2. The options for a job are stored separately to the job in order for them to be modified while they are still in the queue
83132
*/
84-
export class Queue<JobReturnType = unknown, JobOptions extends QueueAddOptions = QueueAddOptions> extends TypedEventEmitter<QueueEvents<JobReturnType>> {
133+
export class Queue<JobReturnType = unknown, JobOptions extends QueueAddOptions = QueueAddOptions> extends TypedEventEmitter<QueueEvents<JobReturnType, JobOptions>> {
85134
public concurrency: number
86135
public queue: Array<Job<JobOptions, JobReturnType>>
87136
private pending: number
@@ -189,11 +238,13 @@ export class Queue<JobReturnType = unknown, JobOptions extends QueueAddOptions =
189238
const p = job.join(options)
190239
.then(result => {
191240
this.safeDispatchEvent('completed', { detail: result })
241+
this.safeDispatchEvent('success', { detail: { job, result } })
192242

193243
return result
194244
})
195245
.catch(err => {
196246
this.safeDispatchEvent('error', { detail: err })
247+
this.safeDispatchEvent('failure', { detail: { job, error: err } })
197248

198249
throw err
199250
})

packages/utils/test/peer-job-queue.spec.ts

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
44
import { expect } from 'aegir/chai'
5+
import delay from 'delay'
56
import pDefer from 'p-defer'
6-
import { PeerQueue } from '../src/peer-queue.js'
7+
import { raceEvent } from 'race-event'
8+
import { PeerQueue, type PeerQueueJobOptions } from '../src/peer-queue.js'
9+
import type { QueueJobFailure, QueueJobSuccess } from '../src/queue/index.js'
710

811
describe('peer queue', () => {
912
it('should have jobs', async () => {
@@ -116,4 +119,46 @@ describe('peer queue', () => {
116119

117120
deferred.resolve(value)
118121
})
122+
123+
it('emits success event', async () => {
124+
const value = 'hello world'
125+
126+
const peerIdA = await createEd25519PeerId()
127+
const queue = new PeerQueue<string>({
128+
concurrency: 1
129+
})
130+
131+
void queue.add(async () => {
132+
await delay(100)
133+
return value
134+
}, {
135+
peerId: peerIdA
136+
}).catch(() => {})
137+
138+
const event = await raceEvent<CustomEvent<QueueJobSuccess<string, PeerQueueJobOptions>>>(queue, 'success')
139+
140+
expect(event.detail.job.options.peerId).to.equal(peerIdA)
141+
expect(event.detail.result).to.equal(value)
142+
})
143+
144+
it('emits failure event', async () => {
145+
const err = new Error('Oh no!')
146+
147+
const peerIdA = await createEd25519PeerId()
148+
const queue = new PeerQueue<string>({
149+
concurrency: 1
150+
})
151+
152+
void queue.add(async () => {
153+
await delay(100)
154+
throw err
155+
}, {
156+
peerId: peerIdA
157+
}).catch(() => {})
158+
159+
const event = await raceEvent<CustomEvent<QueueJobFailure<string, PeerQueueJobOptions>>>(queue, 'failure')
160+
161+
expect(event.detail.job.options.peerId).to.equal(peerIdA)
162+
expect(event.detail.error).to.equal(err)
163+
})
119164
})

0 commit comments

Comments
 (0)