Skip to content

Commit 2dd9100

Browse files
committed
feat: apply role-specific redis connection defaults options for queue vs worker
close #87
1 parent fb5cedc commit 2dd9100

File tree

5 files changed

+123
-8
lines changed

5 files changed

+123
-8
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@nemoventures/adonis-jobs': minor
3+
---
4+
5+
Apply role-specific Redis connection defaults following BullMQ production recommendations. When `useSharedConnection` is `false`, Queue instances now set `enableOfflineQueue: false` (fail fast during disconnections) and Worker instances set `maxRetriesPerRequest: null` (required by BullMQ for resilient reconnection).

packages/core/src/connection_resolver.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,18 @@ export class ConnectionResolver {
1414
}
1515

1616
/**
17-
* Resolve a connection configuration to BullMQ connection options
17+
* Resolve a connection configuration to BullMQ connection options.
18+
*
19+
* Applies role-specific IORedis defaults following BullMQ production
20+
* recommendations: queues should fail fast while workers should be
21+
* resilient to temporary disconnections.
1822
*/
19-
resolve(connectionConfig?: QueueConnectionConfig): BullConnectionOptions {
20-
if (!connectionConfig) connectionConfig = this.config.connection
23+
resolve(options?: {
24+
config?: QueueConnectionConfig
25+
role?: 'queue' | 'worker'
26+
}): BullConnectionOptions {
27+
const connectionConfig = options?.config ?? this.config.connection
28+
const role = options?.role ?? 'queue'
2129

2230
const redisConnection = this.redis.connection(
2331
connectionConfig.connectionName,
@@ -28,8 +36,17 @@ export class ConnectionResolver {
2836
/**
2937
* For non-shared connections, we return the connection options
3038
* in order to let BullMQ create a new IORedis instance.
39+
*
40+
* We apply different defaults based on the role:
41+
* - Queue: disable offline queue so calls fail fast during disconnections
42+
* - Worker: set maxRetriesPerRequest to null (required by BullMQ)
3143
*/
3244
const ioConnection = redisConnection.ioConnection
33-
return { ...ioConnection.options, maxRetriesPerRequest: null }
45+
46+
if (role === 'worker') {
47+
return { ...ioConnection.options, maxRetriesPerRequest: null }
48+
}
49+
50+
return { ...ioConnection.options, enableOfflineQueue: false }
3451
}
3552
}

packages/core/src/queue_manager.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ export class QueueManager<KnownQueues extends Record<string, QueueConfig> = Queu
7878
const queue = BullMqFactory.createQueue<DataType, ReturnType>(String(queueName), {
7979
prefix: this.config.defaultPrefix,
8080
...queueOptions,
81-
connection: this.connectionResolver.resolve(connection),
81+
connection: this.connectionResolver.resolve({ config: connection, role: 'queue' }),
8282
telemetry: new BullMQOtel('adonis-jobs'),
8383
})
8484

@@ -101,7 +101,7 @@ export class QueueManager<KnownQueues extends Record<string, QueueConfig> = Queu
101101
const queueEvents = BullMqFactory.createQueueEvents(String(queueName), {
102102
prefix: this.config.defaultPrefix,
103103
...queueOptions,
104-
connection: this.connectionResolver.resolve(connection),
104+
connection: this.connectionResolver.resolve({ config: connection, role: 'queue' }),
105105
telemetry: new BullMQOtel('adonis-jobs'),
106106
})
107107

@@ -114,7 +114,10 @@ export class QueueManager<KnownQueues extends Record<string, QueueConfig> = Queu
114114

115115
this.#flowProducer = BullMqFactory.createFlowProducer({
116116
prefix: this.config.defaultPrefix,
117-
connection: this.connectionResolver.resolve(this.config.connection),
117+
connection: this.connectionResolver.resolve({
118+
config: this.config.connection,
119+
role: 'queue',
120+
}),
118121
telemetry: new BullMQOtel('adonis-jobs'),
119122
})
120123

packages/core/src/worker/worker.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ export class Worker<KnownQueues extends Record<string, QueueConfig> = Queues> {
4242
*/
4343
#createBullWorker(): BullWorker {
4444
const queueConfig = this.#config.queues[this.#queueName]
45-
const connection = this.#connectionResolver.resolve(queueConfig.connection)
45+
const connection = this.#connectionResolver.resolve({
46+
config: queueConfig.connection,
47+
role: 'worker',
48+
})
4649

4750
return BullMqFactory.createWorker(
4851
String(this.#queueName),
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { test } from '@japa/runner'
2+
3+
import { ConnectionResolver } from '../../src/connection_resolver.js'
4+
import type { Config, QueueConnectionConfig } from '../../src/types/index.js'
5+
6+
function createFakeRedisService(ioOptions: Record<string, any> = {}) {
7+
const fakeIoConnection = { options: { host: '127.0.0.1', port: 6379, ...ioOptions } }
8+
return { connection: () => ({ ioConnection: fakeIoConnection }), fakeIoConnection }
9+
}
10+
11+
function createConfig(overrides?: Partial<Config<any>>): Config<any> {
12+
return {
13+
connection: { connectionName: 'main' } as QueueConnectionConfig,
14+
defaultQueue: 'default',
15+
queues: { default: {} },
16+
...overrides,
17+
}
18+
}
19+
20+
test.group('ConnectionResolver', () => {
21+
test('worker role sets maxRetriesPerRequest to null', ({ assert }) => {
22+
const redis = createFakeRedisService()
23+
const resolver = new ConnectionResolver(createConfig(), redis as any)
24+
25+
const result = resolver.resolve({ role: 'worker' }) as Record<string, any>
26+
27+
assert.equal(result.maxRetriesPerRequest, null)
28+
assert.notProperty(result, 'enableOfflineQueue')
29+
})
30+
31+
test('queue role disables offline queue', ({ assert }) => {
32+
const redis = createFakeRedisService()
33+
const resolver = new ConnectionResolver(createConfig(), redis as any)
34+
35+
const result = resolver.resolve({ role: 'queue' }) as Record<string, any>
36+
37+
assert.equal(result.enableOfflineQueue, false)
38+
assert.notProperty(result, 'maxRetriesPerRequest')
39+
})
40+
41+
test('defaults to queue role when no role is specified', ({ assert }) => {
42+
const redis = createFakeRedisService()
43+
const resolver = new ConnectionResolver(createConfig(), redis as any)
44+
45+
const result = resolver.resolve() as Record<string, any>
46+
47+
assert.equal(result.enableOfflineQueue, false)
48+
assert.notProperty(result, 'maxRetriesPerRequest')
49+
})
50+
51+
test('returns raw ioConnection when useSharedConnection is true', ({ assert }) => {
52+
const redis = createFakeRedisService()
53+
const resolver = new ConnectionResolver(
54+
createConfig({ useSharedConnection: true }),
55+
redis as any,
56+
)
57+
58+
const workerResult = resolver.resolve({ role: 'worker' })
59+
const queueResult = resolver.resolve({ role: 'queue' })
60+
61+
assert.strictEqual(workerResult, redis.fakeIoConnection as any)
62+
assert.strictEqual(queueResult, redis.fakeIoConnection as any)
63+
})
64+
65+
test('spreads ioConnection options into the result', ({ assert }) => {
66+
const redis = createFakeRedisService({ db: 2, password: 'secret' })
67+
const resolver = new ConnectionResolver(createConfig(), redis as any)
68+
69+
const result = resolver.resolve({ role: 'worker' }) as Record<string, any>
70+
71+
assert.equal(result.host, '127.0.0.1')
72+
assert.equal(result.port, 6379)
73+
assert.equal(result.db, 2)
74+
assert.equal(result.password, 'secret')
75+
})
76+
77+
test('uses custom connection config when provided', ({ assert }) => {
78+
const redis = createFakeRedisService()
79+
const customConfig = { connectionName: 'custom' } as QueueConnectionConfig
80+
const resolver = new ConnectionResolver(createConfig(), redis as any)
81+
82+
resolver.resolve({ config: customConfig, role: 'queue' })
83+
84+
// No error thrown means it resolved successfully
85+
assert.isTrue(true)
86+
})
87+
})

0 commit comments

Comments
 (0)