@@ -36,7 +36,10 @@ use fuel_core_storage::tables::{
3636 ContractsState ,
3737 Messages ,
3838} ;
39- use fuel_core_types:: fuel_types:: BlockHeight ;
39+ use fuel_core_types:: {
40+ blockchain:: primitives:: DaBlockHeight ,
41+ fuel_types:: BlockHeight ,
42+ } ;
4043use std:: path:: PathBuf ;
4144
4245#[ derive( Clone , Debug , Eq , PartialEq ) ]
@@ -215,7 +218,7 @@ impl CombinedDatabase {
215218 ) ?;
216219 let relayer = Database :: open_rocksdb (
217220 path,
218- StateRewindPolicy :: NoRewind ,
221+ state_rewind_policy ,
219222 DatabaseConfig {
220223 max_fds,
221224 ..database_config
@@ -421,14 +424,18 @@ impl CombinedDatabase {
421424
422425 let gas_price_chain_height =
423426 self . gas_price ( ) . latest_height_from_metadata ( ) ?;
427+ let gas_price_rolled_back =
428+ is_equal_or_none ( gas_price_chain_height, target_block_height) ;
424429
425- let gas_price_rolled_back = gas_price_chain_height. is_none ( )
426- || gas_price_chain_height. expect ( "We checked height before" )
427- == target_block_height;
430+ let compression_db_height =
431+ self . compression ( ) . latest_height_from_metadata ( ) ?;
432+ let compression_db_rolled_back =
433+ is_equal_or_none ( compression_db_height, target_block_height) ;
428434
429435 if on_chain_height == target_block_height
430436 && off_chain_height == target_block_height
431437 && gas_price_rolled_back
438+ && compression_db_rolled_back
432439 {
433440 break ;
434441 }
@@ -450,7 +457,16 @@ impl CombinedDatabase {
450457 if let Some ( gas_price_chain_height) = gas_price_chain_height {
451458 if gas_price_chain_height < target_block_height {
452459 return Err ( anyhow:: anyhow!(
453- "gas-price-chain database height({gas_price_chain_height}) \
460+ "gas-price database height({gas_price_chain_height}) \
461+ is less than target height({target_block_height})"
462+ ) ) ;
463+ }
464+ }
465+
466+ if let Some ( compression_db_height) = compression_db_height {
467+ if compression_db_height < target_block_height {
468+ return Err ( anyhow:: anyhow!(
469+ "compression database height({compression_db_height}) \
454470 is less than target height({target_block_height})"
455471 ) ) ;
456472 }
@@ -469,6 +485,55 @@ impl CombinedDatabase {
469485 self . gas_price ( ) . rollback_last_block ( ) ?;
470486 }
471487 }
488+
489+ if let Some ( compression_db_height) = compression_db_height {
490+ if compression_db_height > target_block_height {
491+ self . compression ( ) . rollback_last_block ( ) ?;
492+ }
493+ }
494+ }
495+
496+ if shutdown_listener. is_cancelled ( ) {
497+ return Err ( anyhow:: anyhow!(
498+ "Stop the rollback due to shutdown signal received"
499+ ) ) ;
500+ }
501+
502+ Ok ( ( ) )
503+ }
504+
505+ /// Rollbacks the state of the relayer to a specific block height.
506+ pub fn rollback_relayer_to < S > (
507+ & self ,
508+ target_da_height : DaBlockHeight ,
509+ shutdown_listener : & mut S ,
510+ ) -> anyhow:: Result < ( ) >
511+ where
512+ S : ShutdownListener ,
513+ {
514+ while !shutdown_listener. is_cancelled ( ) {
515+ let relayer_db_height = self . relayer ( ) . latest_height_from_metadata ( ) ?;
516+ let relayer_db_rolled_back =
517+ is_equal_or_none ( relayer_db_height, target_da_height) ;
518+
519+ if relayer_db_rolled_back {
520+ break ;
521+ }
522+
523+ if let Some ( relayer_db_height) = relayer_db_height {
524+ if relayer_db_height < target_da_height {
525+ return Err ( anyhow:: anyhow!(
526+ "relayer database height({relayer_db_height}) \
527+ is less than target height({target_da_height})"
528+ ) ) ;
529+ }
530+ }
531+
532+ if let Some ( relayer_db_height) = relayer_db_height {
533+ if relayer_db_height > target_da_height {
534+ self . relayer ( ) . rollback_last_block ( ) ?;
535+ }
536+ }
472537 }
473538
474539 if shutdown_listener. is_cancelled ( ) {
@@ -557,12 +622,19 @@ impl CombinedGenesisDatabase {
557622 }
558623}
559624
625+ fn is_equal_or_none < T : PartialEq > ( maybe_left : Option < T > , right : T ) -> bool {
626+ maybe_left. map ( |left| left == right) . unwrap_or ( true )
627+ }
628+
560629#[ allow( non_snake_case) ]
561630#[ cfg( feature = "backup" ) ]
562631#[ cfg( test) ]
563632mod tests {
564633 use super :: * ;
565- use fuel_core_storage:: StorageAsMut ;
634+ use fuel_core_storage:: {
635+ StorageAsMut ,
636+ StorageAsRef ,
637+ } ;
566638 use fuel_core_types:: {
567639 entities:: coins:: coin:: CompressedCoin ,
568640 fuel_tx:: UtxoId ,
@@ -603,7 +675,7 @@ mod tests {
603675 )
604676 . unwrap ( ) ;
605677
606- let mut restored_on_chain_db = restored_db. on_chain ( ) ;
678+ let restored_on_chain_db = restored_db. on_chain ( ) ;
607679 let restored_value = restored_on_chain_db
608680 . storage :: < Coins > ( )
609681 . get ( & key)
0 commit comments