Skip to content

Commit 144f0f4

Browse files
authored
chore(event-bus, workflow-engine): Enable more granualar queues configuration (medusajs#14201)
Summary This PR adds BullMQ queue and worker configuration options to the workflow-engine-redis module, bringing feature parity with the event-bus-redis module. It also introduces per-queue configuration options for fine-grained control over the three internal queues (main, job, and cleaner). Key changes: - Added per-queue BullMQ configuration options (mainQueueOptions, jobQueueOptions, cleanerQueueOptions and their worker counterparts) with shared defaults - Unified Redis option naming across modules: deprecated url → redisUrl, options → redisOptions (with backward compatibility) - Moved configuration resolution to the loader and registered options in the DI container - Added comprehensive JSDoc documentation for all configuration options - Added unit tests for option merging and queue/worker configuration Configuration Example ```ts // Simple configuration - same options for all queues { redisUrl: "redis://localhost:6379", queueOptions: { defaultJobOptions: { removeOnComplete: 1000 } }, workerOptions: { concurrency: 10 } } ``` ```ts // Advanced configuration - per-queue overrides { redisUrl: "redis://localhost:6379", workerOptions: { concurrency: 10 }, // shared default jobWorkerOptions: { concurrency: 5 }, // override for scheduled workflows cleanerWorkerOptions: { concurrency: 1 } // override for cleanup (low priority) } ```
1 parent 3e3e6c3 commit 144f0f4

File tree

10 files changed

+1011
-72
lines changed

10 files changed

+1011
-72
lines changed

.changeset/slimy-avocados-film.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@medusajs/event-bus-redis": patch
3+
"@medusajs/workflow-engine-redis": patch
4+
---
5+
6+
chore(): Enable more granualar queue configuration

packages/modules/event-bus-redis/src/loaders/index.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { LoaderOptions } from "@medusajs/framework/types"
21
import { asValue } from "@medusajs/framework/awilix"
2+
import { LoaderOptions } from "@medusajs/framework/types"
33
import Redis from "ioredis"
44
import { EOL } from "os"
55
import { EventBusRedisModuleOptions } from "../types"
@@ -9,7 +9,14 @@ export default async ({
99
logger,
1010
options,
1111
}: LoaderOptions): Promise<void> => {
12-
const { redisUrl, redisOptions } = options as EventBusRedisModuleOptions
12+
const {
13+
redisUrl,
14+
redisOptions,
15+
queueName,
16+
queueOptions,
17+
workerOptions,
18+
jobOptions,
19+
} = options as EventBusRedisModuleOptions
1320

1421
if (!redisUrl) {
1522
throw Error(
@@ -39,5 +46,9 @@ export default async ({
3946

4047
container.register({
4148
eventBusRedisConnection: asValue(connection),
49+
eventBusRedisQueueName: asValue(queueName ?? "events-queue"),
50+
eventBusRedisQueueOptions: asValue(queueOptions ?? {}),
51+
eventBusRedisWorkerOptions: asValue(workerOptions ?? {}),
52+
eventBusRedisJobOptions: asValue(jobOptions ?? {}),
4253
})
4354
}

packages/modules/event-bus-redis/src/services/__tests__/event-bus.ts

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@ const redisMock = {
2121
unlink: () => jest.fn(),
2222
} as unknown as Redis
2323

24-
const simpleModuleOptions = { redisUrl: "test-url" }
2524
const moduleDeps = {
2625
logger: loggerMock,
2726
eventBusRedisConnection: redisMock,
27+
eventBusRedisQueueName: "events-queue",
28+
eventBusRedisQueueOptions: {},
29+
eventBusRedisWorkerOptions: {},
30+
eventBusRedisJobOptions: {},
2831
}
2932

33+
const moduleDeclaration = { scope: "internal" } as any
34+
3035
describe("RedisEventBusService", () => {
3136
let eventBus: RedisEventBusService
3237
let queue
@@ -36,9 +41,7 @@ describe("RedisEventBusService", () => {
3641
beforeEach(async () => {
3742
jest.clearAllMocks()
3843

39-
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
40-
scope: "internal",
41-
})
44+
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
4245
})
4346

4447
it("Creates a queue + worker", () => {
@@ -62,9 +65,7 @@ describe("RedisEventBusService", () => {
6265

6366
it("Throws on isolated module declaration", () => {
6467
try {
65-
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
66-
scope: "internal",
67-
})
68+
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
6869
} catch (error) {
6970
expect(error.message).toEqual(
7071
"At the moment this module can only be used with shared resources"
@@ -78,9 +79,7 @@ describe("RedisEventBusService", () => {
7879
beforeEach(async () => {
7980
jest.clearAllMocks()
8081

81-
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
82-
scope: "internal",
83-
})
82+
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
8483

8584
queue = (eventBus as any).queue_
8685
queue.addBulk = jest.fn()
@@ -139,17 +138,15 @@ describe("RedisEventBusService", () => {
139138

140139
it("should add job to queue with module job options", async () => {
141140
eventBus = new RedisEventBusService(
142-
moduleDeps,
143141
{
144-
...simpleModuleOptions,
145-
jobOptions: {
142+
...moduleDeps,
143+
eventBusRedisJobOptions: {
146144
removeOnComplete: { age: 5 },
147145
attempts: 7,
148146
},
149147
},
150-
{
151-
scope: "internal",
152-
}
148+
{},
149+
moduleDeclaration
153150
)
154151

155152
queue = (eventBus as any).queue_
@@ -186,16 +183,14 @@ describe("RedisEventBusService", () => {
186183

187184
it("should add job to queue with default, local, and global options merged", async () => {
188185
eventBus = new RedisEventBusService(
189-
moduleDeps,
190186
{
191-
...simpleModuleOptions,
192-
jobOptions: {
187+
...moduleDeps,
188+
eventBusRedisJobOptions: {
193189
removeOnComplete: 5,
194190
},
195191
},
196-
{
197-
scope: "internal",
198-
}
192+
{},
193+
moduleDeclaration
199194
)
200195

201196
queue = (eventBus as any).queue_
@@ -340,9 +335,7 @@ describe("RedisEventBusService", () => {
340335
beforeEach(async () => {
341336
jest.clearAllMocks()
342337

343-
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
344-
scope: "internal",
345-
})
338+
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
346339

347340
queue = (eventBus as any).queue_
348341
queue.addBulk = jest.fn()
@@ -485,9 +478,7 @@ describe("RedisEventBusService", () => {
485478
beforeEach(async () => {
486479
jest.clearAllMocks()
487480

488-
eventBus = new RedisEventBusService(moduleDeps, simpleModuleOptions, {
489-
scope: "internal",
490-
})
481+
eventBus = new RedisEventBusService(moduleDeps, {}, moduleDeclaration)
491482
})
492483

493484
it("should process a simple event with no options", async () => {

packages/modules/event-bus-redis/src/services/event-bus-redis.ts

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,28 @@ import {
99
isPresent,
1010
promiseAll,
1111
} from "@medusajs/framework/utils"
12-
import { BulkJobOptions, Queue, Worker } from "bullmq"
12+
import {
13+
BulkJobOptions,
14+
Queue,
15+
QueueOptions,
16+
Worker,
17+
WorkerOptions,
18+
} from "bullmq"
1319
import { Redis } from "ioredis"
14-
import { BullJob, EventBusRedisModuleOptions, Options } from "../types"
20+
import {
21+
BullJob,
22+
EmitOptions,
23+
EventBusRedisModuleOptions,
24+
Options,
25+
} from "../types"
1526

1627
type InjectedDependencies = {
1728
logger: Logger
1829
eventBusRedisConnection: Redis
30+
eventBusRedisQueueName: string
31+
eventBusRedisQueueOptions: Omit<QueueOptions, "connection">
32+
eventBusRedisWorkerOptions: Omit<WorkerOptions, "connection">
33+
eventBusRedisJobOptions: EmitOptions
1934
}
2035

2136
type IORedisEventType<T = unknown> = {
@@ -31,46 +46,53 @@ type IORedisEventType<T = unknown> = {
3146
// eslint-disable-next-line max-len
3247
export default class RedisEventBusService extends AbstractEventBusModuleService {
3348
protected readonly logger_: Logger
34-
protected readonly moduleOptions_: EventBusRedisModuleOptions
35-
// eslint-disable-next-line max-len
36-
protected readonly moduleDeclaration_: InternalModuleDeclaration
3749
protected readonly eventBusRedisConnection_: Redis
3850

51+
protected readonly queueName_: string
52+
protected readonly queueOptions_: Omit<QueueOptions, "connection">
53+
protected readonly workerOptions_: Omit<WorkerOptions, "connection">
54+
protected readonly jobOptions_: EmitOptions
55+
3956
protected queue_: Queue
4057
protected bullWorker_: Worker
4158

4259
constructor(
43-
{ logger, eventBusRedisConnection }: InjectedDependencies,
44-
moduleOptions: EventBusRedisModuleOptions = {},
45-
moduleDeclaration: InternalModuleDeclaration
60+
{
61+
logger,
62+
eventBusRedisConnection,
63+
eventBusRedisQueueName,
64+
eventBusRedisQueueOptions,
65+
eventBusRedisWorkerOptions,
66+
eventBusRedisJobOptions,
67+
}: InjectedDependencies,
68+
_moduleOptions: EventBusRedisModuleOptions = {},
69+
_moduleDeclaration: InternalModuleDeclaration
4670
) {
4771
// @ts-ignore
48-
// eslint-disable-next-line prefer-rest-params
4972
super(...arguments)
5073

5174
this.eventBusRedisConnection_ = eventBusRedisConnection
52-
53-
this.moduleOptions_ = moduleOptions
5475
this.logger_ = logger
5576

56-
this.queue_ = new Queue(moduleOptions.queueName ?? `events-queue`, {
77+
this.queueName_ = eventBusRedisQueueName ?? "events-queue"
78+
this.queueOptions_ = eventBusRedisQueueOptions ?? {}
79+
this.workerOptions_ = eventBusRedisWorkerOptions ?? {}
80+
this.jobOptions_ = eventBusRedisJobOptions ?? {}
81+
82+
this.queue_ = new Queue(this.queueName_, {
5783
prefix: `${this.constructor.name}`,
58-
...(moduleOptions.queueOptions ?? {}),
84+
...this.queueOptions_,
5985
connection: eventBusRedisConnection,
6086
})
6187

6288
// Register our worker to handle emit calls
6389
if (this.isWorkerMode) {
64-
this.bullWorker_ = new Worker(
65-
moduleOptions.queueName ?? "events-queue",
66-
this.worker_,
67-
{
68-
prefix: `${this.constructor.name}`,
69-
...(moduleOptions.workerOptions ?? {}),
70-
connection: eventBusRedisConnection,
71-
autorun: false,
72-
}
73-
)
90+
this.bullWorker_ = new Worker(this.queueName_, this.worker_, {
91+
prefix: `${this.constructor.name}`,
92+
...this.workerOptions_,
93+
connection: eventBusRedisConnection,
94+
autorun: false,
95+
})
7496
}
7597
}
7698

@@ -97,7 +119,7 @@ export default class RedisEventBusService extends AbstractEventBusModuleService
97119
removeOnComplete: true,
98120
attempts: 1,
99121
// global options
100-
...(this.moduleOptions_.jobOptions ?? {}),
122+
...this.jobOptions_,
101123
...options,
102124
}
103125

packages/modules/event-bus-redis/src/types/index.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,31 @@ export type BullJob<T> = {
2626
export type EmitOptions = JobsOptions
2727

2828
export type EventBusRedisModuleOptions = {
29+
/**
30+
* Queue name for the event bus
31+
*/
2932
queueName?: string
30-
queueOptions?: QueueOptions
3133

32-
workerOptions?: WorkerOptions
34+
/**
35+
* Options for BullMQ Queue instance
36+
* @see https://api.docs.bullmq.io/interfaces/v5.QueueOptions.html
37+
*/
38+
queueOptions?: Omit<QueueOptions, "connection">
39+
40+
/**
41+
* Options for BullMQ Worker instance
42+
* @see https://api.docs.bullmq.io/interfaces/v5.WorkerOptions.html
43+
*/
44+
workerOptions?: Omit<WorkerOptions, "connection">
3345

46+
/**
47+
* Redis connection string
48+
*/
3449
redisUrl?: string
50+
51+
/**
52+
* Redis client options
53+
*/
3554
redisOptions?: RedisOptions
3655

3756
/**

0 commit comments

Comments
 (0)