Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ JWT_REFRESH_TOKEN_EXPIRY_IN_MINUTES=43200
# REDIS_KEY=
# REDIS_CA=
# REDIS_KEEP_ALIVE=

# Redis Connection Resilience Settings (fixes infinite hangs on connection drops)
# REDIS_CONNECT_TIMEOUT=10000 # Connection timeout in milliseconds
# REDIS_COMMAND_TIMEOUT=30000 # Command timeout in milliseconds (prevents waitUntilFinished hanging)
# REDIS_MAX_RETRIES_PER_REQUEST=3 # Maximum retries per Redis command
# REDIS_LAZY_CONNECT=true # Connect only when needed
# REDIS_KEEPALIVE_INTERVAL=30000 # Keep-alive interval in milliseconds

# ENABLE_BULLMQ_DASHBOARD=


Expand Down
21 changes: 19 additions & 2 deletions packages/server/src/queue/QueueManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ import { ExpressAdapter } from '@bull-board/express'

const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue'

// Redis resilience configuration with environment variable support
const REDIS_CONNECT_TIMEOUT = process.env.REDIS_CONNECT_TIMEOUT ? parseInt(process.env.REDIS_CONNECT_TIMEOUT, 10) : 10000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to hardcode a default value for these.

If these are set, we only pass it in, if not, we leave it

const REDIS_COMMAND_TIMEOUT = process.env.REDIS_COMMAND_TIMEOUT ? parseInt(process.env.REDIS_COMMAND_TIMEOUT, 10) : 30000
const REDIS_MAX_RETRIES_PER_REQUEST = process.env.REDIS_MAX_RETRIES_PER_REQUEST ? parseInt(process.env.REDIS_MAX_RETRIES_PER_REQUEST, 10) : 3
const REDIS_LAZY_CONNECT = process.env.REDIS_LAZY_CONNECT === 'false' ? false : true
const REDIS_KEEPALIVE_INTERVAL = process.env.REDIS_KEEPALIVE_INTERVAL ? parseInt(process.env.REDIS_KEEPALIVE_INTERVAL, 10) : 30000

type QUEUE_TYPE = 'prediction' | 'upsert'

export class QueueManager {
Expand Down Expand Up @@ -45,7 +52,12 @@ export class QueueManager {
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
: REDIS_KEEPALIVE_INTERVAL,
// Enhanced resilience settings
connectTimeout: REDIS_CONNECT_TIMEOUT,
commandTimeout: REDIS_COMMAND_TIMEOUT,
maxRetriesPerRequest: REDIS_MAX_RETRIES_PER_REQUEST,
lazyConnect: REDIS_LAZY_CONNECT
}
} else {
let tlsOpts = undefined
Expand All @@ -66,7 +78,12 @@ export class QueueManager {
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
: REDIS_KEEPALIVE_INTERVAL,
// Enhanced resilience settings
connectTimeout: REDIS_CONNECT_TIMEOUT,
commandTimeout: REDIS_COMMAND_TIMEOUT,
maxRetriesPerRequest: REDIS_MAX_RETRIES_PER_REQUEST,
lazyConnect: REDIS_LAZY_CONNECT
}
}
}
Expand Down