@@ -335,7 +335,7 @@ pub async fn run_keeper_threads(
335335        . in_current_span ( ) , 
336336    ) ; 
337337
338-     let  ( tx,  rx )  = mpsc:: channel :: < BlockRange > ( 1000 ) ; 
338+     let  ( tx,  _rx )  = mpsc:: channel :: < BlockRange > ( 1000 ) ; 
339339    // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel. 
340340    spawn ( 
341341        watch_blocks_wrapper ( 
@@ -346,16 +346,57 @@ pub async fn run_keeper_threads(
346346        ) 
347347        . in_current_span ( ) , 
348348    ) ; 
349-     // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks. 
349+     // Create channels for both immediate and delayed block processing 
350+     let  ( tx_immediate,  rx_immediate)  = mpsc:: channel :: < BlockRange > ( 1000 ) ; 
351+     let  ( tx_delayed,  rx_delayed)  = mpsc:: channel :: < BlockRange > ( 1000 ) ; 
352+ 
353+     // Spawn a thread to watch for new blocks and send the range of blocks to both channels 
354+     spawn ( 
355+         watch_blocks_wrapper ( 
356+             chain_state. clone ( ) , 
357+             latest_safe_block, 
358+             tx_immediate, 
359+             chain_eth_config. geth_rpc_wss . clone ( ) , 
360+         ) 
361+         . in_current_span ( ) , 
362+     ) ; 
363+ 
364+     // Clone the tx_delayed channel for the second watch_blocks_wrapper 
365+     spawn ( 
366+         watch_blocks_wrapper ( 
367+             chain_state. clone ( ) , 
368+             latest_safe_block, 
369+             tx_delayed, 
370+             chain_eth_config. geth_rpc_wss . clone ( ) , 
371+         ) 
372+         . in_current_span ( ) , 
373+     ) ; 
374+ 
375+     // Spawn a thread for immediate block processing 
350376    spawn ( 
351377        process_new_blocks ( 
352378            chain_state. clone ( ) , 
353-             rx, 
379+             rx_immediate, 
380+             Arc :: clone ( & contract) , 
381+             gas_limit, 
382+             chain_eth_config. escalation_policy . clone ( ) , 
383+             metrics. clone ( ) , 
384+             fulfilled_requests_cache. clone ( ) , 
385+         ) 
386+         . in_current_span ( ) , 
387+     ) ; 
388+ 
389+     // Spawn a thread for delayed block processing 
390+     spawn ( 
391+         process_new_blocks_delayed ( 
392+             chain_state. clone ( ) , 
393+             rx_delayed, 
354394            Arc :: clone ( & contract) , 
355395            gas_limit, 
356396            chain_eth_config. escalation_policy . clone ( ) , 
357397            metrics. clone ( ) , 
358398            fulfilled_requests_cache. clone ( ) , 
399+             5 , 
359400        ) 
360401        . in_current_span ( ) , 
361402    ) ; 
@@ -994,6 +1035,43 @@ pub async fn process_new_blocks(
9941035    } 
9951036} 
9961037
1038+ #[ tracing:: instrument( skip_all) ]  
1039+ pub  async  fn  process_new_blocks_delayed ( 
1040+     chain_state :  BlockchainState , 
1041+     mut  rx :  mpsc:: Receiver < BlockRange > , 
1042+     contract :  Arc < InstrumentedSignablePythContract > , 
1043+     gas_limit :  U256 , 
1044+     escalation_policy :  EscalationPolicyConfig , 
1045+     metrics :  Arc < KeeperMetrics > , 
1046+     fulfilled_requests_cache :  Arc < RwLock < HashSet < u64 > > > , 
1047+     delay_blocks :  u64 , 
1048+ )  { 
1049+     tracing:: info!( 
1050+         "Waiting for new block ranges to process with {} block delay" , 
1051+         delay_blocks
1052+     ) ; 
1053+     loop  { 
1054+         if  let  Some ( block_range)  = rx. recv ( ) . await  { 
1055+             let  from_block_after_delay = block_range. from  + delay_blocks; 
1056+             let  adjusted_range = BlockRange  { 
1057+                 from :  from_block_after_delay, 
1058+                 to :  block_range. to  + delay_blocks, 
1059+             } ; 
1060+             process_block_range ( 
1061+                 adjusted_range, 
1062+                 Arc :: clone ( & contract) , 
1063+                 gas_limit, 
1064+                 escalation_policy. clone ( ) , 
1065+                 chain_state. clone ( ) , 
1066+                 metrics. clone ( ) , 
1067+                 fulfilled_requests_cache. clone ( ) , 
1068+             ) 
1069+             . in_current_span ( ) 
1070+             . await ; 
1071+         } 
1072+     } 
1073+ } 
1074+ 
9971075/// Processes the backlog_range for a chain. 
9981076#[ tracing:: instrument( skip_all) ]  
9991077pub  async  fn  process_backlog ( 
0 commit comments