Skip to content

Commit dba69f0

Browse files
committed
fix(redis): separate BullMQ queue and worker connections
BullMQ Workers use blocking commands (BRPOPLPUSH) that require a dedicated connection. Sharing connections caused command starvation and timeouts. - Separate queueConnection and workerConnection - Fix undefined config.db in Redis logs - Use structured logger instead of console.error Fixes production Redis MISCONF errors.
1 parent aab9707 commit dba69f0

File tree

3 files changed

+48
-27
lines changed

3 files changed

+48
-27
lines changed

packages/hocuspocus.server/src/config/hocuspocus.config.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { StoreDocumentQueue } from '../lib/queue'
88
import { HealthCheck } from '../extensions/health.extension'
99
import { RedisSubscriberExtension } from '../extensions/redis-subscriber.extension'
1010
import { prisma } from '../lib/prisma'
11+
import { dbLogger } from '../lib/logger'
1112

1213
export { Database }
1314

@@ -78,7 +79,7 @@ const configureExtensions = () => {
7879
})
7980
return doc ? doc.data : generateDefaultState()
8081
} catch (err) {
81-
console.error('Error fetching data:', err)
82+
dbLogger.error({ err }, 'Error fetching document data')
8283
await prisma.$disconnect()
8384
throw err
8485
}

packages/hocuspocus.server/src/lib/queue.ts

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,38 @@ async function generateUniqueSlug(baseSlug: string): Promise<string> {
1212
return `${baseSlug}-${Date.now()}-${Math.random().toString(36).substr(2, 5)}`
1313
}
1414

15-
// BullMQ requires ioredis - create dedicated connection for queues
16-
// Use centralized Redis config with BullMQ-specific settings
17-
const connection = createRedisConnection({
15+
// BullMQ connection options (shared base config)
16+
const bullmqConnectionOptions = {
1817
maxRetriesPerRequest: null, // BullMQ requirement - allows unlimited retries
19-
enableReadyCheck: true, // Enable to ensure connection is ready before sending commands
20-
enableOfflineQueue: true, // Enable to prevent command timeouts during reconnection
21-
// Explicit timeout for BullMQ operations (longer than default for heavy operations)
18+
enableReadyCheck: true,
19+
enableOfflineQueue: true,
2220
commandTimeout: parseInt(process.env.REDIS_COMMAND_TIMEOUT || '60000', 10),
23-
// Connection pooling settings
2421
connectTimeout: parseInt(process.env.REDIS_CONNECT_TIMEOUT || '30000', 10),
2522
keepAlive: parseInt(process.env.REDIS_KEEPALIVE || '30000', 10)
26-
})
23+
}
24+
25+
// Queue connection (non-blocking operations)
26+
const queueConnection = createRedisConnection(bullmqConnectionOptions)
2727

28-
if (!connection) {
29-
queueLogger.error('Failed to create Redis connection for BullMQ')
28+
if (!queueConnection) {
29+
queueLogger.error('Failed to create Redis connection for BullMQ Queue')
3030
throw new Error('Redis configuration required for queue operations')
3131
}
3232

33+
// Worker connection (blocking operations - MUST be separate)
34+
// BullMQ uses BRPOPLPUSH which blocks the connection
35+
const createWorkerConnection = () => {
36+
const conn = createRedisConnection(bullmqConnectionOptions)
37+
if (!conn) {
38+
queueLogger.error('Failed to create Redis connection for BullMQ Worker')
39+
throw new Error('Redis configuration required for worker operations')
40+
}
41+
return conn
42+
}
43+
3344
// Main queue for storing documents
3445
export const StoreDocumentQueue = new Queue<StoreDocumentData>('store-documents', {
35-
connection,
46+
connection: queueConnection,
3647
defaultJobOptions: {
3748
attempts: 5, // Increased retry attempts for better reliability
3849
backoff: {
@@ -49,7 +60,7 @@ export const StoreDocumentQueue = new Queue<StoreDocumentData>('store-documents'
4960

5061
// Dead Letter Queue for permanently failed jobs
5162
export const DeadLetterQueue = new Queue<StoreDocumentData>('store-documents-dlq', {
52-
connection,
63+
connection: queueConnection,
5364
defaultJobOptions: {
5465
removeOnComplete: {
5566
count: 500,
@@ -67,6 +78,8 @@ StoreDocumentQueue.on('error', (err: Error) => {
6778
// Worker to process document storage jobs
6879
export const createDocumentWorker = () => {
6980
const redisPublisher = getRedisPublisher()
81+
// Worker MUST have dedicated connection (uses blocking commands)
82+
const workerConnection = createWorkerConnection()
7083

7184
const worker = new Worker<StoreDocumentData>(
7285
'store-documents',
@@ -119,7 +132,10 @@ export const createDocumentWorker = () => {
119132
})
120133

121134
const duration = Date.now() - startTime
122-
queueLogger.info({ jobId: job.id, duration: `${duration}ms` }, 'Document stored successfully')
135+
queueLogger.info(
136+
{ jobId: job.id, duration: `${duration}ms` },
137+
'Document stored successfully'
138+
)
123139

124140
// Publish save confirmation to document-specific Redis channel
125141
if (redisPublisher) {
@@ -152,7 +168,7 @@ export const createDocumentWorker = () => {
152168
}
153169
},
154170
{
155-
connection,
171+
connection: workerConnection,
156172
concurrency: parseInt(process.env.BULLMQ_CONCURRENCY || '5', 10),
157173
limiter: {
158174
max: parseInt(process.env.BULLMQ_RATE_LIMIT_MAX || '300', 10),

packages/hocuspocus.server/src/lib/redis.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ const getRedisConfig = () => {
2525
const host = process.env.REDIS_HOST
2626
const port = process.env.REDIS_PORT
2727

28-
2928
if (!host || !port) {
3029
return null
3130
}
@@ -34,7 +33,6 @@ const getRedisConfig = () => {
3433
host,
3534
port: parseInt(port, 10),
3635

37-
3836
// Connection settings
3937
lazyConnect: false, // Connect immediately on creation
4038
connectTimeout: parseInt(process.env.REDIS_CONNECT_TIMEOUT || '20000', 10),
@@ -95,13 +93,15 @@ export const getRedisClient = (): RedisClient | null => {
9593
}
9694

9795
if (!redis) {
98-
redisLogger.info({
99-
host: config.host,
100-
port: config.port,
101-
db: config.db,
102-
lazyConnect: config.lazyConnect,
103-
connectTimeout: config.connectTimeout
104-
}, 'Creating new Redis client...')
96+
redisLogger.info(
97+
{
98+
host: config.host,
99+
port: config.port,
100+
lazyConnect: config.lazyConnect,
101+
connectTimeout: config.connectTimeout
102+
},
103+
'Creating new Redis client...'
104+
)
105105

106106
redis = new Redis(config)
107107

@@ -111,7 +111,7 @@ export const getRedisClient = (): RedisClient | null => {
111111
})
112112

113113
redis.on('ready', () => {
114-
redisLogger.info({ host: config.host, port: config.port, db: config.db }, 'Redis ready')
114+
redisLogger.info({ host: config.host, port: config.port }, 'Redis ready')
115115
})
116116

117117
redis.on('error', (err: Error) => {
@@ -140,7 +140,10 @@ export const getRedisClient = (): RedisClient | null => {
140140
}
141141

142142
// Wait for Redis client to be ready (with auto-connect, it should be connecting already)
143-
export const waitForRedisReady = async (client: RedisClient, timeoutMs = 10000): Promise<boolean> => {
143+
export const waitForRedisReady = async (
144+
client: RedisClient,
145+
timeoutMs = 10000
146+
): Promise<boolean> => {
144147
if (client.status === 'ready') {
145148
return true
146149
}
@@ -238,7 +241,8 @@ export const createRedisConnection = (options: Partial<RedisOptions> = {}): Redi
238241

239242
// BullMQ needs higher timeout for long-running operations
240243
// Use explicit timeout from options if provided, otherwise use env var or default
241-
const commandTimeout = options.commandTimeout ?? parseInt(process.env.REDIS_COMMAND_TIMEOUT || '60000', 10)
244+
const commandTimeout =
245+
options.commandTimeout ?? parseInt(process.env.REDIS_COMMAND_TIMEOUT || '60000', 10)
242246

243247
return new Redis({
244248
...config,

0 commit comments

Comments
 (0)