@@ -250,7 +250,7 @@ impl KeeperMetrics {
250
250
}
251
251
}
252
252
253
- #[ derive( Debug ) ]
253
+ #[ derive( Debug , Clone ) ]
254
254
pub struct BlockRange {
255
255
pub from : BlockNumber ,
256
256
pub to : BlockNumber ,
@@ -346,7 +346,8 @@ pub async fn run_keeper_threads(
346
346
)
347
347
. in_current_span ( ) ,
348
348
) ;
349
- // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
349
+
350
+ // Spawn a thread for block processing with configured delays
350
351
spawn (
351
352
process_new_blocks (
352
353
chain_state. clone ( ) ,
@@ -356,6 +357,7 @@ pub async fn run_keeper_threads(
356
357
chain_eth_config. escalation_policy . clone ( ) ,
357
358
metrics. clone ( ) ,
358
359
fulfilled_requests_cache. clone ( ) ,
360
+ chain_eth_config. block_delays . clone ( ) ,
359
361
)
360
362
. in_current_span ( ) ,
361
363
) ;
@@ -965,8 +967,10 @@ pub async fn watch_blocks(
965
967
}
966
968
}
967
969
968
- /// It waits on rx channel to receive block ranges and then calls process_block_range to process them.
970
+ /// It waits on rx channel to receive block ranges and then calls process_block_range to process them
971
+ /// for each configured block delay.
969
972
#[ tracing:: instrument( skip_all) ]
973
+ #[ allow( clippy:: too_many_arguments) ]
970
974
pub async fn process_new_blocks (
971
975
chain_state : BlockchainState ,
972
976
mut rx : mpsc:: Receiver < BlockRange > ,
@@ -975,12 +979,14 @@ pub async fn process_new_blocks(
975
979
escalation_policy : EscalationPolicyConfig ,
976
980
metrics : Arc < KeeperMetrics > ,
977
981
fulfilled_requests_cache : Arc < RwLock < HashSet < u64 > > > ,
982
+ block_delays : Vec < u64 > ,
978
983
) {
979
984
tracing:: info!( "Waiting for new block ranges to process" ) ;
980
985
loop {
981
986
if let Some ( block_range) = rx. recv ( ) . await {
987
+ // Process blocks immediately first
982
988
process_block_range (
983
- block_range,
989
+ block_range. clone ( ) ,
984
990
Arc :: clone ( & contract) ,
985
991
gas_limit,
986
992
escalation_policy. clone ( ) ,
@@ -990,6 +996,25 @@ pub async fn process_new_blocks(
990
996
)
991
997
. in_current_span ( )
992
998
. await ;
999
+
1000
+ // Then process with each configured delay
1001
+ for delay in & block_delays {
1002
+ let adjusted_range = BlockRange {
1003
+ from : block_range. from . saturating_sub ( * delay) ,
1004
+ to : block_range. to . saturating_sub ( * delay) ,
1005
+ } ;
1006
+ process_block_range (
1007
+ adjusted_range,
1008
+ Arc :: clone ( & contract) ,
1009
+ gas_limit,
1010
+ escalation_policy. clone ( ) ,
1011
+ chain_state. clone ( ) ,
1012
+ metrics. clone ( ) ,
1013
+ fulfilled_requests_cache. clone ( ) ,
1014
+ )
1015
+ . in_current_span ( )
1016
+ . await ;
1017
+ }
993
1018
}
994
1019
}
995
1020
}
0 commit comments