Skip to content

Commit c79f28f

Browse files
feat(watcher): migrate from Agenda to BullMQ for job queuing (#35)
* feat(watcher): migrate from Agenda to BullMQ for job queuing - Replace Agenda with BullMQ to resolve MongoDB version conflicts - Agenda 5.0.0 only supports MongoDB 4.x, causing compatibility issues with Mongoose 8.16.0 - Implement BullMQ loader with Redis backend for job processing - Migrate syncWatcher and replayer controllers to use BullMQ - Add proper connection management and cleanup functions - Remove deprecated Agenda dependencies and loader - Maintain same job scheduling functionality with improved reliability * fix: add job name in worker logs * fix: close bullmq connections to redis before stopping the process on critical errors * fix: remove locktime extension Co-authored-by: pjt <[email protected]> * refactor(watcher): remove unused parameter --------- Co-authored-by: Pierre Jeanjacquot <[email protected]>
1 parent 1936526 commit c79f28f

File tree

8 files changed

+845
-509
lines changed

8 files changed

+845
-509
lines changed

watcher/package-lock.json

Lines changed: 670 additions & 417 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

watcher/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
"dependencies": {
3636
"@iexec/poco": "^5.5.0",
3737
"@socket.io/redis-adapter": "^8.3.0",
38-
"agenda": "^5.0.0",
38+
"bullmq": "^5.58.4",
3939
"debug": "^4.4.0",
4040
"dotenv": "^16.5.0",
4141
"ethers": "^6.13.5",

watcher/src/app.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { startReplayer, stopReplayer } from './controllers/replayer.js';
1919
import { getNextBlockToProcess, setLastBlock } from './services/counter.js';
2020
import { getLogger, APP_NAMESPACE } from './utils/logger.js';
2121
import { errorHandler } from './utils/error.js';
22+
import { clearAllQueues } from './loaders/bullmq.js';
2223

2324
const logger = getLogger(APP_NAMESPACE);
2425

@@ -92,6 +93,10 @@ const stop = async () => {
9293
logger.log('STOPPING...');
9394
await Promise.all([stopSyncWatcher(), stopReplayer()]);
9495
unsubscribeAllEvents();
96+
97+
// Clear BullMQ queues instead of closing connections
98+
await clearAllQueues();
99+
95100
logger.log('STOPPED');
96101
} catch (error) {
97102
errorHandler(error, {

watcher/src/controllers/replayer.js

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as config from '../config.js';
2-
import { getAgenda } from '../loaders/agenda.js';
2+
import { getQueue, getWorker } from '../loaders/bullmq.js';
33
import * as ethereum from '../loaders/ethereum.js';
44
import { replayPastEvents } from './ethEventsWatcher.js';
55
import {
@@ -12,7 +12,6 @@ import { getBlockNumber } from '../utils/eth-utils.js';
1212
import { errorHandler } from '../utils/error.js';
1313
import { traceAll } from '../utils/trace.js';
1414

15-
const { chainId } = config.chain;
1615
const { replayInterval } = config.runtime;
1716

1817
const logger = getLogger('controllers:replayer');
@@ -59,34 +58,43 @@ const _replayPastOnly = async ({
5958
};
6059

6160
const startReplayer = async () => {
62-
const agenda = await getAgenda(chainId);
63-
agenda.define(
61+
const queue = getQueue(EVENT_REPLAY_JOB);
62+
63+
// Create worker to process jobs
64+
getWorker(EVENT_REPLAY_JOB, async () => {
65+
try {
66+
await _replayPastOnly({
67+
handleIndexedBlock: async (blockNumber) => {
68+
await setCheckpointBlock(blockNumber);
69+
},
70+
});
71+
} catch (error) {
72+
errorHandler(error, { type: 'replay-job' });
73+
throw error;
74+
}
75+
});
76+
77+
// Schedule recurring job
78+
await queue.add(
6479
EVENT_REPLAY_JOB,
65-
{ lockLifetime: 10 * 60 * 1000 },
66-
async (job) => {
67-
try {
68-
await _replayPastOnly({
69-
handleIndexedBlock: async (blockNumber) => {
70-
await setCheckpointBlock(blockNumber);
71-
// reset job lock after every iteration
72-
await job.touch();
73-
},
74-
});
75-
} catch (error) {
76-
errorHandler(error, { type: 'replay-job' });
77-
throw error;
78-
}
80+
{},
81+
{
82+
repeat: {
83+
every: replayInterval * 1000, // Convert seconds to milliseconds
84+
},
7985
},
8086
);
81-
await agenda.every(`${replayInterval} seconds`, EVENT_REPLAY_JOB);
87+
8288
logger.log(
8389
`${EVENT_REPLAY_JOB} jobs added (run every ${replayInterval} seconds)`,
8490
);
8591
};
8692

8793
const stopReplayer = async () => {
88-
const agenda = await getAgenda(chainId);
89-
await agenda.cancel({ name: EVENT_REPLAY_JOB });
94+
// Clear the specific queue instead of obliterating
95+
const queue = getQueue(EVENT_REPLAY_JOB);
96+
await queue.obliterate({ force: true });
97+
logger.log(`Stopped ${EVENT_REPLAY_JOB} jobs`);
9098
};
9199

92100
const replayPastOnly = traceAll(_replayPastOnly, { logger });

watcher/src/controllers/syncWatcher.js

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import * as config from '../config.js';
2-
import { getAgenda } from '../loaders/agenda.js';
2+
import { getQueue, getWorker } from '../loaders/bullmq.js';
33
import { getProvider, getRpcProvider } from '../loaders/ethereum.js';
44
import { getLogger } from '../utils/logger.js';
55
import { sleep } from '../utils/utils.js';
66
import { errorHandler } from '../utils/error.js';
77
import { getBlockNumber } from '../utils/eth-utils.js';
88

9-
const { chainId } = config.chain;
109
const { checkSyncInterval } = config.runtime;
1110

1211
const logger = getLogger('controllers:syncWatcher');
@@ -15,7 +14,7 @@ const SYNC_WATCHER_JOB = 'watch-eth-node';
1514

1615
const MAX_ERROR_COUNT = 3;
1716

18-
const checkSync = () => async () => {
17+
const checkSync = async () => {
1918
let errorCount = 0;
2019
let isSync = false;
2120
await sleep(config.runtime.syncWatcherInterval);
@@ -57,17 +56,32 @@ const checkSync = () => async () => {
5756
};
5857

5958
const startSyncWatcher = async () => {
60-
const agenda = await getAgenda(chainId);
61-
agenda.define(SYNC_WATCHER_JOB, { lockLifetime: 16000 }, checkSync());
62-
await agenda.every(`${checkSyncInterval} seconds`, SYNC_WATCHER_JOB);
59+
const queue = getQueue(SYNC_WATCHER_JOB);
60+
61+
// Create worker to process jobs
62+
getWorker(SYNC_WATCHER_JOB, checkSync);
63+
64+
// Schedule recurring job
65+
await queue.add(
66+
SYNC_WATCHER_JOB,
67+
{},
68+
{
69+
repeat: {
70+
every: checkSyncInterval * 1000, // Convert seconds to milliseconds
71+
},
72+
},
73+
);
74+
6375
logger.log(
6476
`${SYNC_WATCHER_JOB} jobs added (run every ${checkSyncInterval} seconds)`,
6577
);
6678
};
6779

6880
const stopSyncWatcher = async () => {
69-
const agenda = await getAgenda(chainId);
70-
await agenda.cancel({ name: SYNC_WATCHER_JOB });
81+
// Clear the specific queue instead of obliterating
82+
const queue = getQueue(SYNC_WATCHER_JOB);
83+
await queue.obliterate({ force: true });
84+
logger.log(`Stopped ${SYNC_WATCHER_JOB} jobs`);
7185
};
7286

7387
export { startSyncWatcher, stopSyncWatcher };

watcher/src/loaders/agenda.js

Lines changed: 0 additions & 61 deletions
This file was deleted.

watcher/src/loaders/bullmq.js

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import { Queue, Worker } from 'bullmq';
2+
import { redis as redisConfig } from '../config.js';
3+
import { getLogger } from '../utils/logger.js';
4+
5+
const logger = getLogger('bullmq');
6+
7+
// Cache for queues and workers
8+
const queues = new Map();
9+
const workers = new Map();
10+
11+
/**
12+
* Creates or returns a cached BullMQ Queue instance.
13+
* Uses Redis for job queuing instead of MongoDB.
14+
*/
15+
const getQueue = (name, options = {}) => {
16+
if (queues.has(name)) {
17+
return queues.get(name);
18+
}
19+
20+
const queue = new Queue(name, {
21+
connection: redisConfig,
22+
defaultJobOptions: {
23+
removeOnComplete: 100, // Keep last 100 completed jobs
24+
removeOnFail: 50, // Keep last 50 failed jobs
25+
attempts: 3, // Retry failed jobs up to 3 times
26+
backoff: {
27+
type: 'exponential',
28+
delay: 2000,
29+
},
30+
},
31+
...options,
32+
});
33+
34+
queues.set(name, queue);
35+
logger.log(`Created queue: ${name}`);
36+
return queue;
37+
};
38+
39+
/**
40+
* Creates or returns a cached BullMQ Worker instance.
41+
*/
42+
const getWorker = (name, processor, options = {}) => {
43+
if (workers.has(name)) {
44+
return workers.get(name);
45+
}
46+
47+
const worker = new Worker(name, processor, {
48+
connection: redisConfig,
49+
concurrency: 1, // Process one job at a time
50+
...options,
51+
});
52+
53+
// Handle worker events
54+
worker.on('completed', (job) => {
55+
logger.debug(`Job ${job.name} ${job.id} completed successfully`);
56+
});
57+
58+
worker.on('failed', (job, err) => {
59+
logger.warn(`Job ${job.name} ${job.id} failed:`, err.message);
60+
});
61+
62+
worker.on('error', (err) => {
63+
logger.error(`Worker error:`, err);
64+
});
65+
66+
workers.set(name, worker);
67+
logger.log(`Created worker: ${name}`);
68+
return worker;
69+
};
70+
71+
/**
72+
* Gracefully closes all queues and workers.
73+
*/
74+
const closeAll = async ({ force = false } = {}) => {
75+
logger.log('Closing all BullMQ connections...');
76+
77+
const closePromises = [];
78+
79+
// Close all workers
80+
for (const worker of workers.values()) {
81+
closePromises.push(worker.close(force));
82+
}
83+
84+
// Close all queues
85+
for (const queue of queues.values()) {
86+
closePromises.push(queue.close(force));
87+
}
88+
89+
await Promise.all(closePromises);
90+
91+
// Clear the caches
92+
queues.clear();
93+
workers.clear();
94+
95+
logger.log('All BullMQ connections closed');
96+
};
97+
98+
/**
99+
* Clears all jobs from all queues without closing connections.
100+
* This is useful for testing when you want to reset the state.
101+
*/
102+
const clearAllQueues = async () => {
103+
logger.log('Clearing all queues...');
104+
105+
const clearPromises = [];
106+
107+
for (const queue of queues.values()) {
108+
clearPromises.push(queue.obliterate({ force: true }));
109+
}
110+
111+
await Promise.all(clearPromises);
112+
logger.log('All queues cleared');
113+
};
114+
115+
export { getQueue, getWorker, closeAll, clearAllQueues };

watcher/src/utils/error.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { closeAll } from '../loaders/bullmq.js';
12
import { getLogger, APP_NAMESPACE } from './logger.js';
23
import { sleep } from './utils.js';
34

@@ -61,7 +62,8 @@ const recoverOnCriticalError = async () => {
6162
logError(
6263
`A critical error has occurred - Stopping process in ${GRACE_PERIOD}ms`,
6364
);
64-
sleep(GRACE_PERIOD);
65+
closeAll({ force: true });
66+
await sleep(GRACE_PERIOD);
6567
logError('A critical error has occurred - Stopping process now');
6668
process.exit(1);
6769
}

0 commit comments

Comments
 (0)