Skip to content

Commit 72d4a17

Browse files
authored
feat: allow to disable queue workers (#507)
1 parent c715a7b commit 72d4a17

File tree

3 files changed

+60
-1
lines changed

3 files changed

+60
-1
lines changed

src/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type StorageConfigType = {
5959
logflareApiKey?: string
6060
logflareSourceToken?: string
6161
pgQueueEnable: boolean
62+
pgQueueEnableWorkers?: boolean
6263
pgQueueConnectionURL?: string
6364
pgQueueDeleteAfterDays?: number
6465
pgQueueArchiveCompletedAfterSeconds?: number
@@ -303,6 +304,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
303304

304305
// Queue
305306
pgQueueEnable: getOptionalConfigFromEnv('PG_QUEUE_ENABLE', 'ENABLE_QUEUE_EVENTS') === 'true',
307+
pgQueueEnableWorkers: getOptionalConfigFromEnv('PG_QUEUE_WORKERS_ENABLE') !== 'false',
306308
pgQueueConnectionURL: getOptionalConfigFromEnv('PG_QUEUE_CONNECTION_URL'),
307309
pgQueueDeleteAfterDays: parseInt(
308310
getOptionalConfigFromEnv('PG_QUEUE_DELETE_AFTER_DAYS') || '2',

src/queue/queue.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export abstract class Queue {
2727
pgQueueDeleteAfterDays,
2828
pgQueueArchiveCompletedAfterSeconds,
2929
pgQueueRetentionDays,
30+
pgQueueEnableWorkers,
3031
} = getConfig()
3132

3233
let url = pgQueueConnectionURL ?? databaseURL
@@ -52,7 +53,9 @@ export abstract class Queue {
5253
expireInHours: 48,
5354
})
5455

55-
registerWorkers()
56+
if (pgQueueEnableWorkers) {
57+
registerWorkers()
58+
}
5659

5760
Queue.pgBoss.on('error', (error) => {
5861
logSchema.error(logger, '[Queue] error', {

src/worker.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { Queue } from './queue'
2+
import { logger, logSchema } from './monitoring'
3+
import adminApp from './admin-app'
4+
import { getConfig } from './config'
5+
6+
export async function main() {
7+
const { requestTraceHeader, adminPort, host } = getConfig()
8+
9+
logger.info('[Queue] Starting Queue Worker')
10+
const queue = await Queue.init()
11+
12+
const server = adminApp({
13+
logger,
14+
disableRequestLogging: true,
15+
requestIdHeader: requestTraceHeader,
16+
})
17+
18+
process.on('SIGTERM', async () => {
19+
logger.info('[Worker] Stopping')
20+
await server.close()
21+
await Queue.stop()
22+
})
23+
24+
await server.listen({ port: adminPort, host })
25+
26+
return new Promise<void>((resolve, reject) => {
27+
queue.on('error', (err) => {
28+
logger.info('[Queue] Error', err)
29+
reject(err)
30+
})
31+
32+
queue.on('stopped', () => {
33+
logger.info('[Queue] Stopping')
34+
resolve()
35+
})
36+
})
37+
}
38+
39+
process.on('uncaughtException', (e) => {
40+
logSchema.error(logger, 'uncaught exception', {
41+
type: 'uncaughtException',
42+
error: e,
43+
})
44+
logger.flush()
45+
process.exit(1)
46+
})
47+
48+
main()
49+
.then(() => {
50+
logger.info('[Queue] Worker Exited Successfully')
51+
})
52+
.catch(() => {
53+
process.exit(1)
54+
})

0 commit comments

Comments
 (0)