11import cron from "node-cron" ;
2- import { getConfig } from "../../utils/cache/getConfig" ;
3- import { env } from "../../utils/env" ;
42import { getBlockTimeSeconds } from "../../utils/indexer/getBlockTime" ;
53import { logger } from "../../utils/logger" ;
6- import { createChainIndexerTask } from "../tasks/chainIndexer" ;
4+ import { handleContractSubscriptions } from "../tasks/chainIndexer" ;
75
6+ // @TODO : Move all worker logic to Bullmq to better handle multiple hosts.
87export const INDEXER_REGISTRY = { } as Record < number , cron . ScheduledTask > ;
98
109export const addChainIndexer = async ( chainId : number ) => {
1110 if ( INDEXER_REGISTRY [ chainId ] ) {
11+ return ;
12+ }
13+
14+ // Estimate the block time in the last 100 blocks. Default to 2 second block times.
15+ let blockTimeSeconds : number ;
16+ try {
17+ blockTimeSeconds = await getBlockTimeSeconds ( chainId , 100 ) ;
18+ } catch ( error ) {
1219 logger ( {
1320 service : "worker" ,
14- level : "warn" ,
15- message : `Chain Indexer already exists: ${ chainId } ` ,
21+ level : "error" ,
22+ message : `Could not estimate block time for chain ${ chainId } ` ,
23+ error,
1624 } ) ;
17- return ;
25+ blockTimeSeconds = 2 ;
1826 }
19-
20- let processStarted = false ;
21- const config = await getConfig ( ) ;
22-
23- // Estimate block time.
24- const blockTimeSeconds = await getBlockTimeSeconds ( chainId ) ;
25-
26- const blocksIn5Seconds = Math . round ( ( 1 / blockTimeSeconds ) * 5 ) ;
27- const maxBlocksToIndex = Math . max (
28- config . maxBlocksToIndex ,
29- blocksIn5Seconds * 4 ,
30- ) ;
31-
32- // Compute block offset based on delay.
33- // Example: 10s delay with a 3s block time = 4 blocks offset
34- const toBlockOffset = env . CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS
35- ? Math . ceil ( env . CONTRACT_SUBSCRIPTIONS_DELAY_SECONDS / blockTimeSeconds )
36- : 0 ;
37-
38- const handler = await createChainIndexerTask ( {
39- chainId,
40- maxBlocksToIndex,
41- toBlockOffset,
42- } ) ;
43-
4427 const cronSchedule = createScheduleSeconds (
4528 Math . max ( Math . round ( blockTimeSeconds ) , 1 ) ,
4629 ) ;
47-
4830 logger ( {
4931 service : "worker" ,
5032 level : "info" ,
51- message : `Indexing contracts on chainId: ${ chainId } with schedule: ${ cronSchedule } , max blocks to index: ${ maxBlocksToIndex } ` ,
33+ message : `Indexing contracts on chain ${ chainId } with schedule: ${ cronSchedule } ` ,
5234 } ) ;
5335
36+ let inProgress = false ;
37+
5438 const task = cron . schedule ( cronSchedule , async ( ) => {
55- if ( ! processStarted ) {
56- processStarted = true ;
39+ if ( inProgress ) {
40+ return ;
41+ }
5742
58- try {
59- await handler ( ) ;
60- } catch ( error ) {
61- // do nothing
62- } finally {
63- processStarted = false ;
64- }
43+ inProgress = true ;
44+ try {
45+ await handleContractSubscriptions ( chainId ) ;
46+ } catch ( error ) {
47+ logger ( {
48+ service : "worker" ,
49+ level : "error" ,
50+ message : `Failed to index on chain ${ chainId } ` ,
51+ error,
52+ } ) ;
53+ } finally {
54+ inProgress = false ;
6555 }
6656 } ) ;
6757
@@ -70,13 +60,7 @@ export const addChainIndexer = async (chainId: number) => {
7060
7161export const removeChainIndexer = async ( chainId : number ) => {
7262 const task = INDEXER_REGISTRY [ chainId ] ;
73-
7463 if ( ! task ) {
75- logger ( {
76- service : "worker" ,
77- level : "warn" ,
78- message : `Chain Indexer doesn't exist: ${ chainId } ` ,
79- } ) ;
8064 return ;
8165 }
8266
0 commit comments