Skip to content

Commit cee716f

Browse files
netromeAurelienFTrymncxgreenx
authored
feat: Support rolling back relayer DB (#3022)
Closes #3019 This PR extends the existing rollback functionality to also roll back the relayer database. The DA height to roll back to is inferred based on the DA height of the target block during the rollback. --------- Co-authored-by: AurelienFT <[email protected]> Co-authored-by: Aaryamann Challani <[email protected]> Co-authored-by: Green Baneling <[email protected]>
1 parent 1944903 commit cee716f

File tree

7 files changed

+282
-20
lines changed

7 files changed

+282
-20
lines changed

.changes/added/3027.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Support state rewind for compression DB.

.changes/breaking/3022.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Support rolling back Relayer DB.

bin/fuel-core/src/cli/rollback.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ pub struct Command {
4242

4343
/// The path to the database.
4444
#[clap(long = "target-block-height")]
45-
pub target_block_height: u32,
45+
pub target_block_height: Option<u32>,
46+
47+
/// The path to the database.
48+
#[clap(long = "target-da-block-height")]
49+
pub target_da_block_height: Option<u64>,
4650
}
4751

4852
fn get_default_max_fds() -> i32 {
@@ -68,9 +72,13 @@ pub async fn exec(command: Command) -> anyhow::Result<()> {
6872
.context(format!("failed to open combined database at path {path:?}"))?;
6973

7074
let mut shutdown_listener = ShutdownListener::spawn();
71-
let target_block_height = command.target_block_height.into();
75+
if let Some(target_block_height) = command.target_block_height {
76+
db.rollback_to(target_block_height.into(), &mut shutdown_listener)?;
77+
}
7278

73-
db.rollback_to(target_block_height, &mut shutdown_listener)?;
79+
if let Some(target_da_height) = command.target_da_block_height {
80+
db.rollback_relayer_to(target_da_height.into(), &mut shutdown_listener)?;
81+
}
7482

7583
Ok(())
7684
}

ci_checks.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
# - `cargo install cargo-nextest`
1515
# - `npm install prettier prettier-plugin-toml`
1616

17-
npx prettier --write "**/Cargo.toml" &&
1817
cargo +nightly fmt --all &&
1918
cargo sort -w --check &&
2019
source .github/workflows/scripts/verify_openssl.sh &&

crates/fuel-core/src/combined_database.rs

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ use fuel_core_storage::tables::{
3636
ContractsState,
3737
Messages,
3838
};
39-
use fuel_core_types::fuel_types::BlockHeight;
39+
use fuel_core_types::{
40+
blockchain::primitives::DaBlockHeight,
41+
fuel_types::BlockHeight,
42+
};
4043
use std::path::PathBuf;
4144

4245
#[derive(Clone, Debug, Eq, PartialEq)]
@@ -215,7 +218,7 @@ impl CombinedDatabase {
215218
)?;
216219
let relayer = Database::open_rocksdb(
217220
path,
218-
StateRewindPolicy::NoRewind,
221+
state_rewind_policy,
219222
DatabaseConfig {
220223
max_fds,
221224
..database_config
@@ -421,14 +424,18 @@ impl CombinedDatabase {
421424

422425
let gas_price_chain_height =
423426
self.gas_price().latest_height_from_metadata()?;
427+
let gas_price_rolled_back =
428+
is_equal_or_none(gas_price_chain_height, target_block_height);
424429

425-
let gas_price_rolled_back = gas_price_chain_height.is_none()
426-
|| gas_price_chain_height.expect("We checked height before")
427-
== target_block_height;
430+
let compression_db_height =
431+
self.compression().latest_height_from_metadata()?;
432+
let compression_db_rolled_back =
433+
is_equal_or_none(compression_db_height, target_block_height);
428434

429435
if on_chain_height == target_block_height
430436
&& off_chain_height == target_block_height
431437
&& gas_price_rolled_back
438+
&& compression_db_rolled_back
432439
{
433440
break;
434441
}
@@ -450,7 +457,16 @@ impl CombinedDatabase {
450457
if let Some(gas_price_chain_height) = gas_price_chain_height {
451458
if gas_price_chain_height < target_block_height {
452459
return Err(anyhow::anyhow!(
453-
"gas-price-chain database height({gas_price_chain_height}) \
460+
"gas-price database height({gas_price_chain_height}) \
461+
is less than target height({target_block_height})"
462+
));
463+
}
464+
}
465+
466+
if let Some(compression_db_height) = compression_db_height {
467+
if compression_db_height < target_block_height {
468+
return Err(anyhow::anyhow!(
469+
"compression database height({compression_db_height}) \
454470
is less than target height({target_block_height})"
455471
));
456472
}
@@ -469,6 +485,55 @@ impl CombinedDatabase {
469485
self.gas_price().rollback_last_block()?;
470486
}
471487
}
488+
489+
if let Some(compression_db_height) = compression_db_height {
490+
if compression_db_height > target_block_height {
491+
self.compression().rollback_last_block()?;
492+
}
493+
}
494+
}
495+
496+
if shutdown_listener.is_cancelled() {
497+
return Err(anyhow::anyhow!(
498+
"Stop the rollback due to shutdown signal received"
499+
));
500+
}
501+
502+
Ok(())
503+
}
504+
505+
/// Rollbacks the state of the relayer to a specific block height.
506+
pub fn rollback_relayer_to<S>(
507+
&self,
508+
target_da_height: DaBlockHeight,
509+
shutdown_listener: &mut S,
510+
) -> anyhow::Result<()>
511+
where
512+
S: ShutdownListener,
513+
{
514+
while !shutdown_listener.is_cancelled() {
515+
let relayer_db_height = self.relayer().latest_height_from_metadata()?;
516+
let relayer_db_rolled_back =
517+
is_equal_or_none(relayer_db_height, target_da_height);
518+
519+
if relayer_db_rolled_back {
520+
break;
521+
}
522+
523+
if let Some(relayer_db_height) = relayer_db_height {
524+
if relayer_db_height < target_da_height {
525+
return Err(anyhow::anyhow!(
526+
"relayer database height({relayer_db_height}) \
527+
is less than target height({target_da_height})"
528+
));
529+
}
530+
}
531+
532+
if let Some(relayer_db_height) = relayer_db_height {
533+
if relayer_db_height > target_da_height {
534+
self.relayer().rollback_last_block()?;
535+
}
536+
}
472537
}
473538

474539
if shutdown_listener.is_cancelled() {
@@ -557,12 +622,19 @@ impl CombinedGenesisDatabase {
557622
}
558623
}
559624

625+
fn is_equal_or_none<T: PartialEq>(maybe_left: Option<T>, right: T) -> bool {
626+
maybe_left.map(|left| left == right).unwrap_or(true)
627+
}
628+
560629
#[allow(non_snake_case)]
561630
#[cfg(feature = "backup")]
562631
#[cfg(test)]
563632
mod tests {
564633
use super::*;
565-
use fuel_core_storage::StorageAsMut;
634+
use fuel_core_storage::{
635+
StorageAsMut,
636+
StorageAsRef,
637+
};
566638
use fuel_core_types::{
567639
entities::coins::coin::CompressedCoin,
568640
fuel_tx::UtxoId,
@@ -603,7 +675,7 @@ mod tests {
603675
)
604676
.unwrap();
605677

606-
let mut restored_on_chain_db = restored_db.on_chain();
678+
let restored_on_chain_db = restored_db.on_chain();
607679
let restored_value = restored_on_chain_db
608680
.storage::<Coins>()
609681
.get(&key)

tests/tests/da_compression.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
use clap::Parser;
12
use core::time::Duration;
23
use fuel_core::{
34
chain_config::TESTNET_WALLET_SECRETS,
5+
combined_database::CombinedDatabase,
6+
database::database_description::DatabaseHeight,
47
p2p_test_helpers::*,
58
service::{
69
Config,
@@ -10,6 +13,13 @@ use fuel_core::{
1013
DaCompressionMode,
1114
},
1215
},
16+
state::{
17+
historical_rocksdb::StateRewindPolicy,
18+
rocks_db::{
19+
ColumnsPolicy,
20+
DatabaseConfig,
21+
},
22+
},
1323
};
1424
use fuel_core_client::client::{
1525
FuelClient,
@@ -253,6 +263,57 @@ async fn da_compression__starts_and_compresses_blocks_correctly_from_empty_datab
253263
driver.kill().await;
254264
}
255265

266+
#[tokio::test]
267+
async fn da_compression__db_can_be_rewinded() {
268+
// given
269+
let rollback_target_height = 0;
270+
let blocks_to_produce = 10;
271+
272+
let args = vec!["--da-compression", "7d", "--debug"];
273+
let driver = FuelCoreDriver::spawn(&args).await.unwrap();
274+
let current_height = driver
275+
.client
276+
.produce_blocks(blocks_to_produce, None)
277+
.await
278+
.unwrap();
279+
assert_eq!(current_height, blocks_to_produce.into());
280+
281+
let db_dir = driver.kill().await;
282+
283+
// when
284+
let target_block_height = rollback_target_height.to_string();
285+
let args = [
286+
"_IGNORED_",
287+
"--db-path",
288+
db_dir.path().to_str().unwrap(),
289+
"--target-block-height",
290+
target_block_height.as_str(),
291+
];
292+
293+
let command = fuel_core_bin::cli::rollback::Command::parse_from(args);
294+
fuel_core_bin::cli::rollback::exec(command).await.unwrap();
295+
296+
let db = CombinedDatabase::open(
297+
db_dir.path(),
298+
StateRewindPolicy::RewindFullRange,
299+
DatabaseConfig {
300+
cache_capacity: Some(16 * 1024 * 1024 * 1024),
301+
max_fds: -1,
302+
columns_policy: ColumnsPolicy::Lazy,
303+
},
304+
)
305+
.expect("Failed to create database");
306+
307+
let compression_db_block_height_after_rollback =
308+
db.compression().latest_height_from_metadata().unwrap();
309+
310+
// then
311+
assert_eq!(
312+
compression_db_block_height_after_rollback.unwrap().as_u64(),
313+
rollback_target_height
314+
);
315+
}
316+
256317
#[tokio::test]
257318
async fn da_compression__starts_and_compresses_blocks_correctly_with_overridden_height() {
258319
// given: the node starts without compression enabled, and produces blocks

0 commit comments

Comments
 (0)