Skip to content

Commit f76a8ee

Browse files
committed
feat: add support for job Signal
close #88
1 parent 084a732 commit f76a8ee

File tree

3 files changed

+41
-6
lines changed

3 files changed

+41
-6
lines changed

docs/docs/pages/guides/creating-jobs.mdx

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,34 @@ export default class SendEmailJob extends Job<SendEmailJobData, SendEmailJobRetu
9898
// Logger instance
9999
this.logger.info('Processing email job', { to: emailData.to })
100100

101+
// AbortSignal - aborted when the worker is shutting down or the job is cancelled
102+
const signal = this.signal
103+
101104
return { messageId: 'abc123', success: true }
102105
}
103106
}
104107
```
105108

109+
### Abort Signal
110+
111+
When the worker shuts down gracefully or a job is explicitly cancelled, BullMQ aborts the `AbortSignal` available via `this.signal`. You can use it to stop long-running work early:
112+
113+
```typescript
114+
export default class CrawlPagesJob extends Job<CrawlData, void> {
115+
async process(): Promise<void> {
116+
// Pass it directly to fetch or any AbortSignal-aware API
117+
const response = await fetch(this.data.url, { signal: this.signal })
118+
119+
// Or check it manually in a loop
120+
for (const page of this.data.pages) {
121+
if (this.signal?.aborted) break
122+
123+
await this.crawlPage(page)
124+
}
125+
}
126+
}
127+
```
128+
106129
## Default Queue
107130

108131
Set a default queue for a job class:

packages/core/src/job/base_job.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ export abstract class BaseJob<DataType, ReturnType> {
5858
worker!: BullWorker<DataType, ReturnType>
5959
logger!: Logger
6060
token?: string
61+
signal?: AbortSignal
6162
error?: Error
6263

6364
abstract process(...args: any[]): Promise<ReturnType>
@@ -131,6 +132,7 @@ export abstract class BaseJob<DataType, ReturnType> {
131132
job: BullJob<DataType, ReturnType>,
132133
token: string | undefined,
133134
logger: Logger,
135+
signal: AbortSignal | undefined,
134136
) {
135137
if (jobClass.encrypted) {
136138
job.data = jobClass.decrypt(job.data as string)
@@ -140,6 +142,7 @@ export abstract class BaseJob<DataType, ReturnType> {
140142
this.job = job
141143
this.data = job.data
142144
this.token = token
145+
this.signal = signal
143146
this.logger = logger.child({ jobName: job.name, jobId: job.id })
144147
}
145148

packages/core/src/worker/worker.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export class Worker<KnownQueues extends Record<string, QueueConfig> = Queues> {
4646

4747
return BullMqFactory.createWorker(
4848
String(this.#queueName),
49-
async (job, token) => this.#processJob(job, token),
49+
async (job, token, signal) => this.#processJob(job, token, signal),
5050
{
5151
autorun: false,
5252
connection,
@@ -105,8 +105,13 @@ export class Worker<KnownQueues extends Record<string, QueueConfig> = Queues> {
105105
/**
106106
* Create a job instance with all dependencies configured
107107
*/
108-
async #createJobInstance(options: { job: BullJob; token?: string; error?: Error }) {
109-
const { job, token, error } = options
108+
async #createJobInstance(options: {
109+
job: BullJob
110+
token?: string
111+
signal?: AbortSignal
112+
error?: Error
113+
}) {
114+
const { job, token, signal, error } = options
110115

111116
const JobClass = this.#getJobClass(job.name)
112117
const adonisLogger = this.#createTracedLogger()
@@ -116,7 +121,7 @@ export class Worker<KnownQueues extends Record<string, QueueConfig> = Queues> {
116121
resolver.bindValue(Logger, jobLogger)
117122

118123
const instance = await resolver.make(JobClass)
119-
instance.$init(this.#bullWorker!, JobClass, job, token, jobLogger)
124+
instance.$init(this.#bullWorker!, JobClass, job, token, jobLogger, signal)
120125

121126
if (error) instance.$setError(error)
122127

@@ -138,8 +143,12 @@ export class Worker<KnownQueues extends Record<string, QueueConfig> = Queues> {
138143
/**
139144
* Start processing a job
140145
*/
141-
async #processJob(job: BullJob, token?: string) {
142-
const { instance: jobInstance, resolver } = await this.#createJobInstance({ job, token })
146+
async #processJob(job: BullJob, token?: string, signal?: AbortSignal) {
147+
const { instance: jobInstance, resolver } = await this.#createJobInstance({
148+
job,
149+
token,
150+
signal,
151+
})
143152

144153
this.#setDefaultSpanAttributes(jobInstance)
145154

0 commit comments

Comments
 (0)