diff --git a/Cargo.lock b/Cargo.lock index c88d879..fc116b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8935,7 +8935,7 @@ dependencies = [ [[package]] name = "commander" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "anyhow", "futures", @@ -12064,7 +12064,7 @@ dependencies = [ [[package]] name = "include-dir" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "anyhow", "commander 0.1.0", @@ -12079,7 +12079,7 @@ dependencies = [ [[package]] name = "include-vendor" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "anyhow", "cargo_metadata 0.19.2", @@ -12375,7 +12375,7 @@ dependencies = [ [[package]] name = "jsonlvar" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "jsonlvar-core", "jsonlvar-macro", @@ -12386,7 +12386,7 @@ dependencies = [ [[package]] name = "jsonlvar-core" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "regex", "serde", @@ -12397,7 +12397,7 @@ dependencies = [ [[package]] name = "jsonlvar-macro" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "proc-macro2", "quote", @@ -12409,7 +12409,7 @@ dependencies = [ [[package]] name = "jsonlvar-tokio" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "anyhow", "jsonlvar", @@ -12618,7 +12618,7 @@ dependencies = [ [[package]] name = "kestrel" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "futures", "kestrel-macro", @@ -12631,7 +12631,7 @@ dependencies = [ [[package]] name = "kestrel-macro" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "convert_case 0.8.0", "proc-macro2", @@ -12644,7 +12644,7 @@ dependencies = [ [[package]] name = "kestrel-process" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "commander 0.1.0", "jsonlvar", @@ -12658,7 +12658,7 @@ dependencies = [ [[package]] name = "kestrel-state" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "thiserror 1.0.69", "tokio", @@ -15955,6 +15955,7 @@ name = "mtma-node-types" version = "0.0.1" dependencies = [ "anyhow", + "bytes", "either", "futures-channel", "maptos-opt-executor", @@ -15963,6 +15964,7 @@ dependencies = [ "mtma-types", "tempfile", "thiserror 1.0.69", + "tokio", "tracing", "tracing-test", "uuid", @@ -18471,7 +18473,7 @@ dependencies = [ [[package]] name = "ready-docker" version = "0.1.0" -source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=3220d704df7e06d1dcc5266e15eaf05db86fdb07#3220d704df7e06d1dcc5266e15eaf05db86fdb07" +source = "git+https://github.com/movementlabsxyz/kestrel.git?rev=e23cbcc8c804d27d8ae39f582ff7eae6bebad50#e23cbcc8c804d27d8ae39f582ff7eae6bebad503" dependencies = [ "anyhow", "bollard", diff --git a/Cargo.toml b/Cargo.toml index 31c22fe..3959463 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,7 @@ chrono = "0.4.31" rand = "0.7.3" uuid = "1.10.0" glob = "0.3.2" +bytes = { version = "1.4.0", features = ["serde"] } poem = { version = "=3.1.3", features = ["anyhow", "compression", "rustls"] } poem-openapi = { version = "=5.1.2", features = ["swagger-ui", "url"] } @@ -141,12 +142,12 @@ aptos-vm-genesis = { git = "https://github.com/movementlabsxyz/aptos-core.git", # kestrel -kestrel = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "3220d704df7e06d1dcc5266e15eaf05db86fdb07" } -jsonlvar = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "3220d704df7e06d1dcc5266e15eaf05db86fdb07" } -commander = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "3220d704df7e06d1dcc5266e15eaf05db86fdb07" } -include-dir = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "3220d704df7e06d1dcc5266e15eaf05db86fdb07" } -include-vendor = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "3220d704df7e06d1dcc5266e15eaf05db86fdb07" } -ready-docker = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "3220d704df7e06d1dcc5266e15eaf05db86fdb07" } +kestrel = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "e23cbcc8c804d27d8ae39f582ff7eae6bebad50" } +jsonlvar = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "e23cbcc8c804d27d8ae39f582ff7eae6bebad50" } +commander = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "e23cbcc8c804d27d8ae39f582ff7eae6bebad50" } +include-dir = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "e23cbcc8c804d27d8ae39f582ff7eae6bebad50" } +include-vendor = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "e23cbcc8c804d27d8ae39f582ff7eae6bebad50" } +ready-docker = { git = "https://github.com/movementlabsxyz/kestrel.git", rev = "e23cbcc8c804d27d8ae39f582ff7eae6bebad50" } # orfile orfile = { git = "https://github.com/movementlabsxyz/orfile.git", rev = "f02851242af77791b905efc19aef6af21918fb1e" } diff --git a/checks/migrator/checks/balances-equal/src/balances_equal.rs b/checks/migrator/checks/balances-equal/src/balances_equal.rs index ebce5fa..9ef9bbe 100644 --- a/checks/migrator/checks/balances-equal/src/balances_equal.rs +++ b/checks/migrator/checks/balances-equal/src/balances_equal.rs @@ -12,62 +12,51 @@ pub mod test { #[tokio::test(flavor = "multi_thread")] #[tracing_test::traced_test] async fn test_balances_equal() -> Result<(), anyhow::Error> { - // use a scope to ensure everything is dropped - { - // Form the migrator. - let mut movement_migrator = MovementMigrator::try_temp()?; - movement_migrator.set_overlays(Overlays::default()); - - // Start the migrator so that it's running in the background. - // In the future, some migrators may be for already running nodes. - let movement_migrator_for_task = movement_migrator.clone(); - let movement_migrator_task = kestrel::task(async move { - movement_migrator_for_task.run().await?; - Ok::<_, anyhow::Error>(()) - }); - - // wait for the rest client to be ready - // once we have this, there should also be a config, so we can then kill off the migrator and proceed - movement_migrator + // Form the migrator. + let mut movement_migrator = MovementMigrator::try_temp()?; + movement_migrator.set_overlays(Overlays::default()); + + // Start the migrator so that it's running in the background. + // In the future, some migrators may be for already running nodes. + let movement_migrator_for_task = movement_migrator.clone(); + let movement_migrator_task = kestrel::task(async move { + movement_migrator_for_task.run().await?; + Ok::<_, anyhow::Error>(()) + }); + + // wait for the rest client to be ready + // once we have this, there should also be a config, so we can then kill off the migrator and proceed + movement_migrator .wait_for_rest_client_ready(tokio::time::Duration::from_secs(600)) // we wait for up to ten minutes because the nix flake in .vendors/movementcan be a bit slow the first time .await .context( "failed to wait for movement migrator rest client while running accounts equal manual prelude", )?; - kestrel::end!(movement_migrator_task)?; + kestrel::end!(movement_migrator_task)?; - // Form the prelude. - // todo: this needs to be updated to use the prelude generator - let prelude = Prelude::new_empty(); + // Form the prelude. + // todo: this needs to be updated to use the prelude generator + let prelude = Prelude::new_empty(); - // Form the migration. - let migration_config = MtmaNullConfig::default(); - let migration = migration_config.build()?; + // Form the migration. + let migration_config = MtmaNullConfig::default(); + let migration = migration_config.build()?; - // Run the checked migration. - let balances_equal = BalancesEqual::new(); - info!("Running migration"); - match checked_migration( - &mut movement_migrator, - &prelude, - &migration, - vec![balances_equal], - ) + // Run the checked migration. + let balances_equal = BalancesEqual::new(); + info!("Running migration"); + match checked_migration(&mut movement_migrator, &prelude, &migration, vec![balances_equal]) .await - { - Ok(()) => {} - Err(e) => { - info!("Migration failed: {:?}", e); - return Err(anyhow::anyhow!("Migration failed: {:?}", e)); - } + { + Ok(()) => { + info!("Migration succeeded"); + std::process::exit(0); + } + Err(e) => { + info!("Migration failed: {:?}", e); + panic!("Migration failed: {:?}", e); } - info!("Migration succeeded"); } - - // exit the test is fine when you only have one test per crate because when cargo test is run across a workspace, it actually multi-processes the tests by crate - std::process::exit(0); - - // Ok(()) } } diff --git a/checks/migrator/checks/sketchpad/src/accounts_equal.rs b/checks/migrator/checks/sketchpad/src/accounts_equal.rs index 810351c..d877692 100644 --- a/checks/migrator/checks/sketchpad/src/accounts_equal.rs +++ b/checks/migrator/checks/sketchpad/src/accounts_equal.rs @@ -12,62 +12,53 @@ pub mod test { #[tokio::test(flavor = "multi_thread")] #[tracing_test::traced_test] async fn test_accounts_equal() -> Result<(), anyhow::Error> { - // use a scope to ensure everything is dropped - { - // Form the migrator. - let mut movement_migrator = MovementMigrator::try_temp()?; - movement_migrator.set_overlays(Overlays::default()); + // Form the migrator. + let mut movement_migrator = MovementMigrator::try_temp()?; + movement_migrator.set_overlays(Overlays::default()); - // Start the migrator so that it's running in the background. - // In the future, some migrators may be for already running nodes. - let movement_migrator_for_task = movement_migrator.clone(); - let movement_migrator_task = kestrel::task(async move { - movement_migrator_for_task.run().await?; - Ok::<_, anyhow::Error>(()) - }); + // Start the migrator so that it's running in the background. + // In the future, some migrators may be for already running nodes. + let movement_migrator_for_task = movement_migrator.clone(); + let movement_migrator_task = kestrel::task(async move { + movement_migrator_for_task.run().await?; + Ok::<_, anyhow::Error>(()) + }); - // wait for the rest client to be ready - // once we have this, there should also be a config, so we can then kill off the migrator and proceed - movement_migrator - .wait_for_rest_client_ready(tokio::time::Duration::from_secs(600)) // we wait for up to ten minutes because the nix flake in .vendors/movementcan be a bit slow the first time - .await - .context( - "failed to wait for movement migrator rest client while running accounts equal manual prelude", - )?; + // wait for the rest client to be ready + // once we have this, there should also be a config, so we can then kill off the migrator and proceed + movement_migrator + .wait_for_rest_client_ready(tokio::time::Duration::from_secs(600)) // we wait for up to ten minutes because the nix flake in .vendors/movementcan be a bit slow the first time + .await + .context( + "failed to wait for movement migrator rest client while running accounts equal manual prelude", + )?; - kestrel::end!(movement_migrator_task)?; + kestrel::end!(movement_migrator_task)?; - // Form the prelude. - // todo: this needs to be updated to use the prelude generator - let prelude = Prelude::new_empty(); + // Form the prelude. + // todo: this needs to be updated to use the prelude generator + let prelude = Prelude::new_empty(); - // Form the migration. - let migration_config = MtmaNullConfig::default(); - let migration = migration_config.build()?; + // Form the migration. + let migration_config = MtmaNullConfig::default(); + let migration = migration_config.build()?; - // Run the checked migration. - let accounts_equal = AccountsEqual::new(); - info!("Running migration"); - match checked_migration( - &mut movement_migrator, - &prelude, - &migration, - vec![accounts_equal], - ) + // Run the checked migration. + let accounts_equal = AccountsEqual::new(); + info!("Running migration"); + match checked_migration(&mut movement_migrator, &prelude, &migration, vec![accounts_equal]) .await - { - Ok(()) => {} - Err(e) => { - info!("Migration failed: {:?}", e); - return Err(anyhow::anyhow!("Migration failed: {:?}", e)); - } + { + Ok(_) => { + info!("Migration succeeded"); + std::process::exit(0); + } + Err(e) => { + info!("Migration failed: {:?}", e); + panic!("Migration failed: {:?}", e); } - info!("Migration succeeded"); } - // exit the test is fine when you only have one test per crate because when cargo test is run across a workspace, it actually multi-processes the tests by crate - std::process::exit(0); - // Ok(()) } } diff --git a/checks/migrator/citeria/accounts-equal/src/lib.rs b/checks/migrator/citeria/accounts-equal/src/lib.rs index d13891a..7280790 100644 --- a/checks/migrator/citeria/accounts-equal/src/lib.rs +++ b/checks/migrator/citeria/accounts-equal/src/lib.rs @@ -30,6 +30,7 @@ impl Criterionish for AccountsEqual { "failed to wait for movement migrator rest client while checking accounts equal", ) .map_err(|e| CriterionError::Internal(e.into()))?; + let movement_aptos_rest_client = movement_aptos_migrator .wait_for_rest_client_ready(tokio::time::Duration::from_secs(30)) .await @@ -41,19 +42,13 @@ impl Criterionish for AccountsEqual { info!("Iterating over movement node accounts"); for account_address_res in movement_node - .iter_account_addresses(0) + .iter_account_addresses(Some(0)) + .iter() .map_err(|e| CriterionError::Internal(e.into()))? { - let account_address = match account_address_res - .context("account address is none") - .map_err(|e| CriterionError::Internal(e.into())) - { - Ok(account_address) => account_address, - Err(e) => { - info!("Transaction has no sender: {:?}", e); - continue; - } - }; + let account_address = account_address_res + .context("failed to get account address") + .map_err(|e| CriterionError::Internal(e.into()))?; info!("Getting movement resource"); let movement_resource = movement_rest_client diff --git a/checks/migrator/citeria/balances-equal/src/lib.rs b/checks/migrator/citeria/balances-equal/src/lib.rs index 2f7bcb7..f51e875 100644 --- a/checks/migrator/citeria/balances-equal/src/lib.rs +++ b/checks/migrator/citeria/balances-equal/src/lib.rs @@ -41,19 +41,13 @@ impl Criterionish for BalancesEqual { info!("Iterating over movement node accounts"); for account_address_res in movement_node - .iter_account_addresses(0) + .iter_account_addresses(Some(0)) + .iter() .map_err(|e| CriterionError::Internal(e.into()))? { - let account_address = match account_address_res - .context("account address is none") - .map_err(|e| CriterionError::Internal(e.into())) - { - Ok(account_address) => account_address, - Err(e) => { - info!("Transaction has no sender: {:?}", e); - continue; - } - }; + let account_address = account_address_res + .context("failed to get account address") + .map_err(|e| CriterionError::Internal(e.into()))?; info!("Getting movement account balance"); let movement_account_balance = movement_rest_client diff --git a/checks/migrator/util/types/src/check.rs b/checks/migrator/util/types/src/check.rs index 6dd349d..a40fe2c 100644 --- a/checks/migrator/util/types/src/check.rs +++ b/checks/migrator/util/types/src/check.rs @@ -2,7 +2,7 @@ use crate::criterion::{Criterionish, MovementMigrator}; use anyhow::Context; use mtma_migrator_types::migration::Migrationish; use mtma_node_test_types::prelude::Prelude; -use tracing::info; +use tracing::{info, warn}; /// Errors thrown when working with the [Config]. #[derive(Debug, thiserror::Error)] @@ -24,6 +24,14 @@ pub async fn checked_migration( migration: &impl Migrationish, criteria: Vec, ) -> Result<(), CheckError> { + // reset the states of the movement migrator + info!("Resetting movement migrator states"); + movement_migrator + .reset_states() + .await + .context("failed to reset movement migrator states") + .map_err(|e| CheckError::Internal(e.into()))?; + // Get the executor info!("Getting movement executor"); let mut movement_executor = movement_migrator @@ -52,7 +60,10 @@ pub async fn checked_migration( info!("Tasking movement migrator"); let movement_migrator_for_task = movement_migrator.clone(); let movement_migrator_task = kestrel::task(async move { - movement_migrator_for_task.run().await?; + movement_migrator_for_task.run().await.map_err(|e| { + warn!("Failed to run movement migrator: {:?}", e); + e + })?; Ok::<_, anyhow::Error>(()) }); @@ -65,18 +76,20 @@ pub async fn checked_migration( }); // wait for the rest client to be ready - // once we have this, there should also be a config, so we can then kill off the migrator and proceed info!("Waiting for movement migrator rest client"); movement_migrator - .wait_for_rest_client_ready(tokio::time::Duration::from_secs(120)) + .wait_for_rest_client_ready(tokio::time::Duration::from_secs(300)) .await .context("failed to wait for movement migrator rest client") - .map_err(|e| CheckError::Migration(e.into()))?; + .map_err(|e| { + warn!("Failed to wait for movement migrator rest client: {:?}", e); + CheckError::Migration(e.into()) + })?; // wait for movement aptos migrator to be ready info!("Waiting for movement aptos migrator rest client"); movement_aptos_migrator - .wait_for_rest_client_ready(tokio::time::Duration::from_secs(120)) + .wait_for_rest_client_ready(tokio::time::Duration::from_secs(300)) .await .context("failed to wait for movement aptos migrator rest client") .map_err(|e| CheckError::Migration(e.into()))?; @@ -84,18 +97,11 @@ pub async fn checked_migration( // Run the criteria info!("Running criteria"); for criterion in criteria { - match criterion + criterion .satisfies(&movement_migrator, &movement_aptos_migrator) .await .context("failed to satisfy criterion") - .map_err(|e| CheckError::Criteria(e.into())) - { - Ok(()) => {} - Err(e) => { - info!("Criterion failed: {:?}", e); - return Err(e); - } - } + .map_err(|e| CheckError::Criteria(e.into()))?; } // kill the movement migrator diff --git a/checks/node/checks/sketchpad/src/global_storage_includes.rs b/checks/node/checks/sketchpad/src/global_storage_includes.rs deleted file mode 100644 index 9cb6a05..0000000 --- a/checks/node/checks/sketchpad/src/global_storage_includes.rs +++ /dev/null @@ -1,40 +0,0 @@ -#[cfg(test)] -pub mod test { - - use mtma_node_preludes::basic::BasicPrelude; - use mtma_node_test_global_storage_includes_criterion::GlobalStorageIncludes; - use mtma_node_test_types::{ - check::checked_migration, - criterion::movement_executor::{MovementNode, MovementOptExecutor}, - prelude::PreludeGenerator, - }; - use mtma_node_null_core::config::Config as MtmaNullConfig; - - #[tokio::test] - #[tracing_test::traced_test] - async fn test_global_storage_includes_null() -> Result<(), anyhow::Error> { - // form the executor - let (movement_opt_executor, _temp_dir, private_key, _receiver) = - MovementOptExecutor::try_generated().await?; - let mut movement_executor = MovementNode::new(movement_opt_executor); - - // form the prelude - let prelude = BasicPrelude { private_key, chain_id: movement_executor.chain_id() } - .generate() - .await?; - - // form the migration - let migration_config = MtmaNullConfig::default(); - let migration = migration_config.build()?; - - // run the checked migration - checked_migration( - &mut movement_executor, - &prelude, - &migration, - vec![Box::new(GlobalStorageIncludes::new())], - ) - .await?; - Ok(()) - } -} diff --git a/checks/node/checks/sketchpad/src/global_storage_injective.rs b/checks/node/checks/sketchpad/src/global_storage_injective.rs deleted file mode 100644 index d1833c8..0000000 --- a/checks/node/checks/sketchpad/src/global_storage_injective.rs +++ /dev/null @@ -1,40 +0,0 @@ -#[cfg(test)] -pub mod test { - - use mtma_node_preludes::basic::BasicPrelude; - use mtma_node_test_global_storage_injective_criterion::GlobalStorageInjective; - use mtma_node_test_types::{ - check::checked_migration, - criterion::movement_executor::{MovementNode, MovementOptExecutor}, - prelude::PreludeGenerator, - }; - use mtma_node_null_core::config::Config as MtmaNullConfig; - - #[tokio::test] - async fn test_global_storage_injective_null() -> Result<(), anyhow::Error> { - // form the executor - let (movement_opt_executor, _temp_dir, private_key, _receiver) = - MovementOptExecutor::try_generated().await?; - let mut movement_executor = MovementNode::new(movement_opt_executor); - - // form the prelude - let prelude_generator = - BasicPrelude { private_key, chain_id: movement_executor.chain_id() }; - let prelude = prelude_generator.generate().await?; - - // form the migration - let migration_config = MtmaNullConfig::default(); - let migration = migration_config.build()?; - - // run the checked migration - checked_migration( - &mut movement_executor, - &prelude, - &migration, - vec![Box::new(GlobalStorageInjective::new())], - ) - .await?; - - Ok(()) - } -} diff --git a/checks/node/checks/sketchpad/src/global_storage_not_empty.rs b/checks/node/checks/sketchpad/src/global_storage_not_empty.rs deleted file mode 100644 index e396fa8..0000000 --- a/checks/node/checks/sketchpad/src/global_storage_not_empty.rs +++ /dev/null @@ -1,40 +0,0 @@ -#[cfg(test)] -pub mod test { - - use mtma_node_preludes::basic::BasicPrelude; - use mtma_node_test_global_storage_not_empty_criterion::GlobalStorageNotEmpty; - use mtma_node_test_types::{ - check::checked_migration, - criterion::movement_executor::{MovementNode, MovementOptExecutor}, - prelude::PreludeGenerator, - }; - use mtma_node_null_core::config::Config as MtmaNullConfig; - - #[tokio::test] - async fn test_global_storage_injective_null() -> Result<(), anyhow::Error> { - // form the executor - let (movement_opt_executor, _temp_dir, private_key, _receiver) = - MovementOptExecutor::try_generated().await?; - let mut movement_executor = MovementNode::new(movement_opt_executor); - - // form the prelude - let prelude_generator = - BasicPrelude { private_key, chain_id: movement_executor.chain_id() }; - let prelude = prelude_generator.generate().await?; - - // form the migration - let migration_config = MtmaNullConfig::default(); - let migration = migration_config.build()?; - - // run the checked migration - checked_migration( - &mut movement_executor, - &prelude, - &migration, - vec![Box::new(GlobalStorageNotEmpty::new())], - ) - .await?; - - Ok(()) - } -} diff --git a/migration/util/migrator-types/src/migrator/movement_migrator.rs b/migration/util/migrator-types/src/migrator/movement_migrator.rs index e460dbc..6f17cc0 100644 --- a/migration/util/migrator-types/src/migrator/movement_migrator.rs +++ b/migration/util/migrator-types/src/migrator/movement_migrator.rs @@ -103,4 +103,14 @@ impl MovementMigrator { Runner::Movement(movement) => movement.set_overlays(overlays), } } + + /// Resets the states of the [MovementMigrator] (forces them to be filled again). + pub async fn reset_states(&self) -> Result<(), anyhow::Error> { + match &self.runner { + Runner::Movement(movement) => movement + .reset_states() + .await + .map_err(|e| anyhow::anyhow!("failed to reset movement states: {}", e)), + } + } } diff --git a/migration/util/node-types/Cargo.toml b/migration/util/node-types/Cargo.toml index b19ba56..9266170 100644 --- a/migration/util/node-types/Cargo.toml +++ b/migration/util/node-types/Cargo.toml @@ -20,10 +20,12 @@ movement-util = { workspace = true } uuid = { workspace = true } walkdir = { workspace = true } mtma-types = { workspace = true } +bytes = { workspace = true } [dev-dependencies] tempfile = { workspace = true } tracing-test = { workspace = true } +tokio = { workspace = true } [lints] workspace = true diff --git a/migration/util/node-types/src/executor/movement_executor.rs b/migration/util/node-types/src/executor/movement_executor.rs index f728a35..9ecc24c 100644 --- a/migration/util/node-types/src/executor/movement_executor.rs +++ b/migration/util/node-types/src/executor/movement_executor.rs @@ -1,24 +1,16 @@ use anyhow::Context; -use either::Either; + pub use maptos_opt_executor::Executor as MovementOptExecutor; use movement_util::common_args::MovementArgs; use mtma_types::movement::aptos_crypto::HashValue; use mtma_types::movement::aptos_storage_interface::state_view::DbStateView; -use mtma_types::movement::aptos_storage_interface::DbReader; use mtma_types::movement::aptos_types::state_store::state_key::StateKey; -use mtma_types::movement::aptos_types::transaction::Version; -use mtma_types::movement::aptos_types::{ - account_address::AccountAddress, - block_executor::partitioner::{ExecutableBlock, ExecutableTransactions}, - transaction::signature_verified_transaction::into_signature_verified_block, - transaction::Transaction, -}; +use mtma_types::movement::aptos_types::transaction::Transaction; pub use mtma_types::movement::aptos_types::{chain_id::ChainId, state_store::TStateView}; use std::fs; use std::fs::Permissions; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; -use std::sync::Arc; use tracing::info; pub use maptos_opt_executor; @@ -26,6 +18,17 @@ use tracing::debug; use uuid::Uuid; use walkdir::WalkDir; +use bytes::Bytes; + +pub mod account_address; +pub use account_address::*; +pub mod block; +pub use block::*; +pub mod transaction; +pub use transaction::*; +pub mod global_state_key; +pub use global_state_key::*; + /// The Movement executor as would be presented in the criterion. pub struct MovementNode { /// The opt executor. @@ -35,73 +38,82 @@ pub struct MovementNode { } /// Copies a directory recursively while ignoring specified paths -fn copy_dir_recursive_with_ignore(src: &Path, ignore_paths: impl IntoIterator>, dst: &Path) -> Result<(), anyhow::Error> { - let ignore_paths: Vec<_> = ignore_paths.into_iter().collect(); - let mut errors: Vec> = Vec::new(); - - // Configure WalkDir to be more resilient - let walker = WalkDir::new(src) - .follow_links(false) - .same_file_system(true) - .into_iter() - .filter_entry(|entry| { - // Early filter for ignored paths to avoid permission errors - let path = entry.path(); - !ignore_paths.iter().any(|ignore| path.to_string_lossy().contains(ignore.as_ref().to_string_lossy().as_ref())) - }); - - for entry in walker { - let entry = match entry { - Ok(e) => e, - Err(e) => { - // Check if this is a permission error on an ignored path - if let Some(path) = e.path() { - if ignore_paths.iter().any(|ignore| path.to_string_lossy().contains(ignore.as_ref().to_string_lossy().as_ref())) { - debug!("Ignoring permission error on ignored path: {}", path.display()); - continue; - } - } - // For other errors, log with info for now and fail. - info!("Error accessing path: {:?}", e); +fn copy_dir_recursive_with_ignore( + src: &Path, + ignore_paths: impl IntoIterator>, + dst: &Path, +) -> Result<(), anyhow::Error> { + let ignore_paths: Vec<_> = ignore_paths.into_iter().collect(); + let mut errors: Vec> = Vec::new(); + + // Configure WalkDir to be more resilient + let walker = WalkDir::new(src) + .follow_links(false) + .same_file_system(true) + .into_iter() + .filter_entry(|entry| { + // Early filter for ignored paths to avoid permission errors + let path = entry.path(); + !ignore_paths.iter().any(|ignore| { + path.to_string_lossy().contains(ignore.as_ref().to_string_lossy().as_ref()) + }) + }); + + for entry in walker { + let entry = match entry { + Ok(e) => e, + Err(e) => { + // Check if this is a permission error on an ignored path + if let Some(path) = e.path() { + if ignore_paths.iter().any(|ignore| { + path.to_string_lossy().contains(ignore.as_ref().to_string_lossy().as_ref()) + }) { + debug!("Ignoring permission error on ignored path: {}", path.display()); + continue; + } + } + // For other errors, log with info for now and fail. + info!("Error accessing path: {:?}", e); errors.push(Err(anyhow::anyhow!("failed to get entry: {:?}", e))); - continue; - } - }; - - debug!("Processing entry: {}", entry.path().display()); - - let path = entry.path(); - let dest_path = dst.join(path.strip_prefix(src).context("failed to strip prefix")?); - - if entry.file_type().is_dir() { - match fs::create_dir_all(&dest_path) { - Ok(_) => (), - Err(e) => errors.push(Err(e.into())), - } - } else { - if let Some(parent) = dest_path.parent() { - match fs::create_dir_all(parent) { - Ok(_) => (), - Err(e) => errors.push(Err(e.into())), - } - } - match fs::copy(path, &dest_path) { - Ok(_) => (), - Err(e) => errors.push(Err(e.into())), - } - } - } - - // Combine all results into one - if !errors.is_empty() { - let mut total_error_message = String::from("failed to copy directory with the following errors: "); - for error in errors { - total_error_message = total_error_message + &format!("{:?}\n", error); - } - return Err(anyhow::anyhow!(total_error_message)); - } - - Ok(()) + continue; + } + }; + + debug!("Processing entry: {}", entry.path().display()); + + let path = entry.path(); + let dest_path = dst.join(path.strip_prefix(src).context("failed to strip prefix")?); + + if entry.file_type().is_dir() { + match fs::create_dir_all(&dest_path) { + Ok(_) => (), + Err(e) => errors.push(Err(e.into())), + } + } else { + if let Some(parent) = dest_path.parent() { + match fs::create_dir_all(parent) { + Ok(_) => (), + Err(e) => errors.push(Err(e.into())), + } + } + match fs::copy(path, &dest_path) { + Ok(_) => (), + Err(e) => errors.push(Err(e.into())), + } + } + } + + // Combine all results into one + if !errors.is_empty() { + let mut total_error_message = + String::from("failed to copy directory with the following errors: "); + for error in errors { + total_error_message = total_error_message + &format!("{:?}\n", error); + } + return Err(anyhow::anyhow!(total_error_message)); + } + + Ok(()) } /// Sets all permission in a directory recursively. @@ -132,16 +144,23 @@ impl MovementNode { // set the permissions on the movement dir to 755 // Note: this would mess up celestia node permissions, but we don't care about that here. // We really only care about maptos db permissions. - fs::set_permissions(&movement_dir, Permissions::from_mode(0o755)).context(format!("failed to set permissions on the movement directory {}", movement_dir.display()))?; + fs::set_permissions(&movement_dir, Permissions::from_mode(0o755)).context(format!( + "failed to set permissions on the movement directory {}", + movement_dir.display() + ))?; // don't copy anything from the celestia directory - copy_dir_recursive_with_ignore(&movement_dir, [PathBuf::from("celestia")], &debug_dir).context("failed to copy movement dir")?; + copy_dir_recursive_with_ignore(&movement_dir, [PathBuf::from("celestia")], &debug_dir) + .context("failed to copy movement dir")?; // Set all permissions in the debug directory recursively // Note: this would mess up celestia node permissions, but we don't care about that here. // We really only care about maptos db permissions. // TODO: tighten the copying accordingly. - set_permissions_recursive(&debug_dir, Permissions::from_mode(0o755)).context(format!("failed to set permissions on the debug directory {}", debug_dir.display()))?; + set_permissions_recursive(&debug_dir, Permissions::from_mode(0o755)).context(format!( + "failed to set permissions on the debug directory {}", + debug_dir.display() + ))?; let movement_args = MovementArgs { movement_path: Some(debug_dir.display().to_string()) }; @@ -216,12 +235,15 @@ impl MovementNode { self.opt_executor().state_view_at_version(version) } - /// Gets the all [StateKey]s in the global storage dating back to an original version. None is treated as 0 or all versions. - pub fn global_state_keys_from_version(&self, version: Option) -> GlobalStateKeyIterable { - GlobalStateKeyIterable { - db_reader: self.opt_executor().db_reader(), - version: version.unwrap_or(0), - } + /// Get state value bytes for a given version and state key. + pub fn get_state_value_bytes( + &self, + version: Option, + state_key: StateKey, + ) -> Result, anyhow::Error> { + let state_view = self.state_view_at_version(version)?; + let bytes = state_view.get_state_value_bytes(&state_key)?; + Ok(bytes) } /// Gets the genesis block hash. @@ -231,12 +253,6 @@ impl MovementNode { Ok(block_event.hash()?) } - /// Iterates over all blocks in the db. - pub fn iter_blocks(&self, start_version: u64) -> Result, anyhow::Error> { - let latest_version = self.latest_ledger_version()?; - Ok(BlockIterator { executor: self, version: start_version, latest_version }) - } - /// Gets the genesis transaction. pub fn genesis_transaction(&self) -> Result { // get genesis transaction from db @@ -245,236 +261,6 @@ impl MovementNode { db_reader.get_transaction_by_version(0, self.latest_ledger_version()?, true)?; Ok(genesis_transaction.transaction) } - - /// Iterates over all transactions in the db. - pub fn iter_transactions( - &self, - start_version: u64, - ) -> Result, anyhow::Error> { - Ok(TransactionIterator::new(self, start_version, self.latest_ledger_version()?)) - } - - /// Iterates over all account keys - /// NOTE: does this by checking all transactions and getting the [AccountAddress] signers from them - pub fn iter_account_addresses( - &self, - start_version: u64, - ) -> Result, anyhow::Error> { - Ok(AccountAddressIterator::new(self, start_version)) - } -} - -pub struct BlockIterator<'a> { - executor: &'a MovementNode, - version: u64, - latest_version: u64, -} - -impl<'a> Iterator for BlockIterator<'a> { - type Item = Result<(Version, Version, ExecutableBlock), anyhow::Error>; - - fn next(&mut self) -> Option { - if self.version > self.latest_version { - return None; - } - - let db_reader = self.executor.opt_executor().db_reader(); - let (start_version, end_version, new_block_event) = - match db_reader.get_block_info_by_version(self.version) { - Ok(info) => info, - Err(e) => return Some(Err(e.into())), - }; - - let mut transactions = Vec::new(); - for version in start_version..=end_version { - let transaction = - match db_reader.get_transaction_by_version(version, self.latest_version, false) { - Ok(t) => t, - Err(e) => return Some(Err(e.into())), - }; - transactions.push(transaction.transaction); - } - - let executable_transactions = - ExecutableTransactions::Unsharded(into_signature_verified_block(transactions)); - let block = ExecutableBlock::new( - match new_block_event.hash() { - Ok(hash) => hash, - Err(e) => return Some(Err(e.into())), - }, - executable_transactions, - ); - - self.version = end_version + 1; - Some(Ok((start_version, end_version, block))) - } -} -/// An iterable of [StateKey]s in the global storage dating back to an original version. -/// -/// This helps deal with lifetime issues. -pub struct GlobalStateKeyIterable { - db_reader: Arc, - version: u64, -} - -const MAX_WRITE_SET_SIZE: u64 = 20_000; - -impl GlobalStateKeyIterable { - pub fn iter( - &self, - ) -> Result> + '_>, anyhow::Error> { - let write_set_iterator = - self.db_reader.get_write_set_iterator(self.version, MAX_WRITE_SET_SIZE)?; - - // We want to iterate lazily over the write set iterator because there could be a lot of them. - let mut count = 0; - let iter = write_set_iterator.flat_map(move |res| match res { - Ok(write_set) => { - debug!("Iterating over write set {}", count); - count += 1; - // It should be okay to collect because there should not be that many state keys in a write set. - let items: Vec<_> = write_set.iter().map(|(key, _)| Ok(key.clone())).collect(); - Either::Left(items.into_iter()) - } - Err(e) => Either::Right(std::iter::once(Err(e.into()))), - }); - - Ok(Box::new(iter)) - } -} - -pub struct TransactionIterator<'a> { - executor: &'a MovementNode, - version: u64, - latest_version: u64, - current_block_iter: Option + 'a>>, -} - -impl<'a> TransactionIterator<'a> { - fn new(executor: &'a MovementNode, version: u64, latest_version: u64) -> Self { - Self { executor, version, latest_version, current_block_iter: None } - } -} - -impl<'a> Iterator for TransactionIterator<'a> { - type Item = Result; - - fn next(&mut self) -> Option { - // If we have a current block iterator, try to get the next transaction from it - info!("Getting next transaction from block iterator"); - if let Some(iter) = &mut self.current_block_iter { - if let Some(tx) = iter.next() { - info!("Got next transaction from block iterator {:?}", tx); - return Some(Ok(tx)); - } - // If we've exhausted the current block's transactions, clear the iterator - self.current_block_iter = None; - } - - // If we've reached the end, we're done - if self.version > self.latest_version { - return None; - } - - // Get the next block - let mut block_iterator = match self.executor.iter_blocks(self.version) { - Ok(iter) => iter, - Err(e) => return Some(Err(e)), - }; - - // Get the next block and update version - let (_, end_version, block) = match block_iterator.next() { - Some(Ok(block_info)) => block_info, - Some(Err(e)) => return Some(Err(e)), - None => return None, - }; - self.version = end_version + 1; - - // Create an iterator for this block's transactions - self.current_block_iter = - Some(Box::new(block.transactions.into_txns().into_iter().map(|tx| tx.into_inner()))); - - // Recursively call next to get the first transaction from the new block - self.next() - } -} - -pub struct AccountAddressIterator<'a> { - executor: &'a MovementNode, - version: u64, - latest_version: u64, - current_tx_iter: Option> + 'a>>, -} - -impl<'a> AccountAddressIterator<'a> { - fn new(executor: &'a MovementNode, start_version: u64) -> Self { - Self { - executor, - version: start_version, - latest_version: executor.latest_ledger_version().unwrap_or(u64::MAX), - current_tx_iter: None, - } - } - - fn get_next_address(&mut self) -> Option> { - // If we don't have a transaction iterator, get one - info!("Getting next address from transaction iterator"); - if self.current_tx_iter.is_none() { - match self.executor.iter_transactions(self.version) { - Ok(iter) => { - // Create an iterator that extracts account addresses from transactions - info!("Creating iterator for transactions"); - let addresses_iter = iter.flat_map(|tx_result| { - match tx_result { - Ok(tx) => { - // Extract account address from transaction - match >::try_into(tx) { - Ok(t) => { - if let Ok(user_tx) = mtma_types::movement::aptos_types::transaction::SignedTransaction::try_from(t) { - vec![Ok(user_tx.sender())] - } else { - Vec::new() // Skip non-user transactions - } - } - _ => Vec::new(), // Skip non-user transactions - } - } - Err(e) => vec![Err(e)], - } - }); - self.current_tx_iter = Some(Box::new(addresses_iter)); - } - Err(e) => return Some(Err(e)), - } - } - - // Try to get the next address from our iterator - info!("Trying to get next address from iterator"); - if let Some(iter) = &mut self.current_tx_iter { - if let Some(addr_result) = iter.next() { - return Some(addr_result); - } - } - - // If we've exhausted the current iterator, move to next version - info!("Exhausted current iterator, moving to next version"); - self.current_tx_iter = None; - self.version += 1; - if self.version <= self.latest_version { - self.get_next_address() - } else { - None - } - } -} - -impl<'a> Iterator for AccountAddressIterator<'a> { - type Item = Result; - - fn next(&mut self) -> Option { - info!("Getting next address from account address iterator"); - self.get_next_address() - } } #[cfg(test)] @@ -490,19 +276,30 @@ mod test { // write a file that should be copied let file_path = temp_dir.path().join("maptos").join("test_file.txt"); - fs::create_dir_all(file_path.parent().context("failed to get parent directory for file that should be copied")?).context("failed to create directory")?; + fs::create_dir_all( + file_path + .parent() + .context("failed to get parent directory for file that should be copied")?, + ) + .context("failed to create directory")?; fs::write(file_path, "test").context("failed to write file that should be copied")?; // write a file that should not be copied let file_path = temp_dir.path().join("celestia").join("test_file2.txt"); - fs::create_dir_all(file_path.parent().context("failed to get parent directory for file that should not be copied")?).context("failed to create directory")?; + fs::create_dir_all( + file_path + .parent() + .context("failed to get parent directory for file that should not be copied")?, + ) + .context("failed to create directory")?; fs::write(file_path, "test").context("failed to write file that should not be copied")?; // create the target temp dir let dst = TempDir::new()?; // copy the file to a new dir, ignoring celestia directory - copy_dir_recursive_with_ignore(&temp_dir.path(), [PathBuf::from("celestia")], &dst.path()).context("failed to copy directory")?; + copy_dir_recursive_with_ignore(&temp_dir.path(), [PathBuf::from("celestia")], &dst.path()) + .context("failed to copy directory")?; // check that the file was copied assert!(dst.path().join("maptos").join("test_file.txt").exists()); @@ -517,24 +314,44 @@ mod test { // This indicates that failure is not due to the inability to ignore copy, but rather some issue performing an oepration that requires permissions. #[test] fn test_are_you_kidding_me() -> Result<(), anyhow::Error> { - let source_dir = TempDir::new()?; let target_dir = TempDir::new()?; - let path_that_must_be_ignored = source_dir.path().join(".movement/celestia/c1860ae680eb2d91927b/.celestia-app/keyring-test"); + let path_that_must_be_ignored = source_dir + .path() + .join(".movement/celestia/c1860ae680eb2d91927b/.celestia-app/keyring-test"); - fs::create_dir_all(path_that_must_be_ignored.parent().context("failed to get parent directory for path that must be ignored")?).context("failed to create directory")?; + fs::create_dir_all( + path_that_must_be_ignored + .parent() + .context("failed to get parent directory for path that must be ignored")?, + ) + .context("failed to create directory")?; // write a file that must not be ignored - fs::write(path_that_must_be_ignored.clone(), "test").context("failed to write file that must not be ignored")?; + fs::write(path_that_must_be_ignored.clone(), "test") + .context("failed to write file that must not be ignored")?; // set permissions to 000 on the file and then on the parent directory - fs::set_permissions(path_that_must_be_ignored.clone(), Permissions::from_mode(0o000)).context("failed to set permissions on file that must not be ignored")?; - fs::set_permissions(path_that_must_be_ignored.parent().context("failed to get parent directory for path that must be ignored")?, Permissions::from_mode(0o000)).context("failed to set permissions on parent directory that must not be ignored")?; - - copy_dir_recursive_with_ignore(source_dir.path(), ["celestia"], target_dir.path()).context("failed to copy directory")?; - - assert!(!target_dir.path().join("celestia").join("c1860ae680eb2d91927b").join(".celestia-app").join("keyring-test").exists()); + fs::set_permissions(path_that_must_be_ignored.clone(), Permissions::from_mode(0o000)) + .context("failed to set permissions on file that must not be ignored")?; + fs::set_permissions( + path_that_must_be_ignored + .parent() + .context("failed to get parent directory for path that must be ignored")?, + Permissions::from_mode(0o000), + ) + .context("failed to set permissions on parent directory that must not be ignored")?; + + copy_dir_recursive_with_ignore(source_dir.path(), ["celestia"], target_dir.path()) + .context("failed to copy directory")?; + + assert!(!target_dir + .path() + .join("celestia") + .join("c1860ae680eb2d91927b") + .join(".celestia-app") + .join("keyring-test") + .exists()); Ok(()) } - -} \ No newline at end of file +} diff --git a/migration/util/node-types/src/executor/movement_executor/account_address.rs b/migration/util/node-types/src/executor/movement_executor/account_address.rs new file mode 100644 index 0000000..e3364b0 --- /dev/null +++ b/migration/util/node-types/src/executor/movement_executor/account_address.rs @@ -0,0 +1,114 @@ +use super::GlobalStateKeyIterable; +use super::MovementNode; +use mtma_types::movement::aptos_types::account_address::AccountAddress; +use mtma_types::movement::aptos_types::state_store::state_key::inner::StateKeyInner; +use tracing::debug; + +impl MovementNode { + /// Iterates over all account keys + /// NOTE: does this by checking the [AccessPath]s on all global storage keys. + pub fn iter_account_addresses(&self, version: Option) -> AccountAddressIterable { + AccountAddressIterable { + global_state_key_iterable: self.global_state_keys_from_version(version), + } + } +} + +/// An iterable of [AccountAddress]es extracted from global storage keys dating back to an original version. +pub struct AccountAddressIterable { + global_state_key_iterable: GlobalStateKeyIterable, +} + +impl AccountAddressIterable { + pub fn iter( + &self, + ) -> Result> + '_>, anyhow::Error> + { + let state_key_iter = self.global_state_key_iterable.iter()?; + + // Map over StateKey to extract AccountAddress using AccessPath + let addresses_iter = state_key_iter.flat_map(|state_key_result| match state_key_result { + Ok(state_key) => match state_key.inner() { + StateKeyInner::AccessPath(access_path) => { + debug!("Flat mapping state key to account address: {:?}", state_key); + vec![Ok(access_path.address)] + } + _ => vec![], + }, + Err(e) => vec![Err(e)], + }); + + Ok(Box::new(addresses_iter)) + } +} + +#[cfg(test)] +mod test { + use super::super::MovementOptExecutor; + use super::*; + use std::collections::HashSet; + use tracing::info; + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_iter_account_addresses() -> Result<(), anyhow::Error> { + // form the executor + let (movement_opt_executor, _temp_dir, _private_key, _receiver) = + MovementOptExecutor::try_generated().await?; + let movement_node = MovementNode::new(movement_opt_executor); + + // form the iterable + let account_address_iterable = movement_node.iter_account_addresses(Some(0)); + let account_addresses = account_address_iterable.iter()?; + + let mut found_account_addresses = HashSet::new(); + for account_address in account_addresses { + let account_address = account_address?; + info!("Account address: {:?}", account_address); + found_account_addresses.insert(account_address); + } + + let comparison_account_addresses = HashSet::from([ + AccountAddress::from_hex_literal( + "0x0000000000000000000000000000000000000000000000000000000000000001", + )?, + AccountAddress::from_hex_literal( + "0x0000000000000000000000000000000000000000000000000000000000000002", + )?, + AccountAddress::from_hex_literal( + "0x0000000000000000000000000000000000000000000000000000000000000003", + )?, + AccountAddress::from_hex_literal( + "0x0000000000000000000000000000000000000000000000000000000000000004", + )?, + AccountAddress::from_hex_literal( + "0x0000000000000000000000000000000000000000000000000000000000000005", + )?, + AccountAddress::from_hex_literal( + "0x0000000000000000000000000000000000000000000000000000000000000006", + )?, + AccountAddress::from_hex_literal( + "0x0000000000000000000000000000000000000000000000000000000000000007", + )?, + AccountAddress::from_hex_literal( + "0x0000000000000000000000000000000000000000000000000000000000000008", + )?, + AccountAddress::from_hex_literal( + "0x0000000000000000000000000000000000000000000000000000000000000009", + )?, + AccountAddress::from_hex_literal( + "0x000000000000000000000000000000000000000000000000000000000000000a", + )?, + AccountAddress::from_hex_literal( + "0x000000000000000000000000000000000000000000000000000000000a550c18", + )?, + AccountAddress::from_hex_literal( + "0xd1126ce48bd65fb72190dbd9a6eaa65ba973f1e1664ac0cfba4db1d071fd0c36", + )?, + ]); + + assert_eq!(found_account_addresses, comparison_account_addresses); + + Ok(()) + } +} diff --git a/migration/util/node-types/src/executor/movement_executor/block.rs b/migration/util/node-types/src/executor/movement_executor/block.rs new file mode 100644 index 0000000..f6dc3a3 --- /dev/null +++ b/migration/util/node-types/src/executor/movement_executor/block.rs @@ -0,0 +1,59 @@ +use super::MovementNode; +use mtma_types::movement::aptos_types::{ + block_executor::partitioner::{ExecutableBlock, ExecutableTransactions}, + transaction::{signature_verified_transaction::into_signature_verified_block, Version}, +}; + +impl MovementNode { + /// Iterates over all blocks in the db. + pub fn iter_blocks(&self, start_version: u64) -> Result, anyhow::Error> { + let latest_version = self.latest_ledger_version()?; + Ok(BlockIterator { executor: self, version: start_version, latest_version }) + } +} + +pub struct BlockIterator<'a> { + pub(crate) executor: &'a MovementNode, + pub(crate) version: u64, + pub(crate) latest_version: u64, +} + +impl<'a> Iterator for BlockIterator<'a> { + type Item = Result<(Version, Version, ExecutableBlock), anyhow::Error>; + + fn next(&mut self) -> Option { + if self.version > self.latest_version { + return None; + } + + let db_reader = self.executor.opt_executor().db_reader(); + let (start_version, end_version, new_block_event) = + match db_reader.get_block_info_by_version(self.version) { + Ok(info) => info, + Err(e) => return Some(Err(e.into())), + }; + + let mut transactions = Vec::new(); + for version in start_version..=end_version { + let transaction = + match db_reader.get_transaction_by_version(version, self.latest_version, false) { + Ok(t) => t, + Err(e) => return Some(Err(e.into())), + }; + transactions.push(transaction.transaction); + } + + let executable_transactions = + ExecutableTransactions::Unsharded(into_signature_verified_block(transactions)); + let block = ExecutableBlock::new( + match new_block_event.hash() { + Ok(hash) => hash, + Err(e) => return Some(Err(e.into())), + }, + executable_transactions, + ); + + self.version = end_version + 1; + Some(Ok((start_version, end_version, block))) + } +} diff --git a/migration/util/node-types/src/executor/movement_executor/global_state_key.rs b/migration/util/node-types/src/executor/movement_executor/global_state_key.rs new file mode 100644 index 0000000..6e56027 --- /dev/null +++ b/migration/util/node-types/src/executor/movement_executor/global_state_key.rs @@ -0,0 +1,50 @@ +use super::MovementNode; +use either::Either; +use mtma_types::movement::aptos_storage_interface::DbReader; +use mtma_types::movement::aptos_types::state_store::state_key::StateKey; +use std::sync::Arc; +use tracing::debug; + +impl MovementNode { + /// Gets the all [StateKey]s in the global storage dating back to an original version. None is treated as 0 or all versions. + pub fn global_state_keys_from_version(&self, version: Option) -> GlobalStateKeyIterable { + GlobalStateKeyIterable { + db_reader: self.opt_executor().db_reader(), + version: version.unwrap_or(0), + } + } +} + +/// An iterable of [StateKey]s in the global storage dating back to an original version. +/// +/// This helps deal with lifetime issues. +pub struct GlobalStateKeyIterable { + pub(crate) db_reader: Arc, + pub(crate) version: u64, +} + +const MAX_WRITE_SET_SIZE: u64 = 20_000; + +impl GlobalStateKeyIterable { + pub fn iter( + &self, + ) -> Result> + '_>, anyhow::Error> { + let write_set_iterator = + self.db_reader.get_write_set_iterator(self.version, MAX_WRITE_SET_SIZE)?; + + // We want to iterate lazily over the write set iterator because there could be a lot of them. + let mut count = 0; + let iter = write_set_iterator.flat_map(move |res| match res { + Ok(write_set) => { + debug!("Iterating over write set {}", count); + count += 1; + // It should be okay to collect because there should not be that many state keys in a write set. + let items: Vec<_> = write_set.iter().map(|(key, _)| Ok(key.clone())).collect(); + Either::Left(items.into_iter()) + } + Err(e) => Either::Right(std::iter::once(Err(e.into()))), + }); + + Ok(Box::new(iter)) + } +} diff --git a/migration/util/node-types/src/executor/movement_executor/transaction.rs b/migration/util/node-types/src/executor/movement_executor/transaction.rs new file mode 100644 index 0000000..10fcee8 --- /dev/null +++ b/migration/util/node-types/src/executor/movement_executor/transaction.rs @@ -0,0 +1,69 @@ +use super::MovementNode; +use mtma_types::movement::aptos_types::transaction::Transaction; +use tracing::info; + +impl MovementNode { + /// Iterates over all transactions in the db. + pub fn iter_transactions( + &self, + start_version: u64, + ) -> Result, anyhow::Error> { + Ok(TransactionIterator::new(self, start_version, self.latest_ledger_version()?)) + } +} + +pub struct TransactionIterator<'a> { + pub(crate) executor: &'a MovementNode, + pub(crate) version: u64, + pub(crate) latest_version: u64, + pub(crate) current_block_iter: Option + 'a>>, +} + +impl<'a> TransactionIterator<'a> { + pub(crate) fn new(executor: &'a MovementNode, version: u64, latest_version: u64) -> Self { + Self { executor, version, latest_version, current_block_iter: None } + } +} + +impl<'a> Iterator for TransactionIterator<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + // If we have a current block iterator, try to get the next transaction from it + info!("Getting next transaction from block iterator"); + if let Some(iter) = &mut self.current_block_iter { + if let Some(tx) = iter.next() { + info!("Got next transaction from block iterator {:?}", tx); + return Some(Ok(tx)); + } + // If we've exhausted the current block's transactions, clear the iterator + self.current_block_iter = None; + } + + // If we've reached the end, we're done + if self.version > self.latest_version { + return None; + } + + // Get the next block + let mut block_iterator = match self.executor.iter_blocks(self.version) { + Ok(iter) => iter, + Err(e) => return Some(Err(e)), + }; + + // Get the next block and update version + let (_, end_version, block) = match block_iterator.next() { + Some(Ok(block_info)) => block_info, + Some(Err(e)) => return Some(Err(e)), + None => return None, + }; + self.version = end_version + 1; + + // Create an iterator for this block's transactions + self.current_block_iter = + Some(Box::new(block.transactions.into_txns().into_iter().map(|tx| tx.into_inner()))); + + // Recursively call next to get the first transaction from the new block + self.next() + } +} diff --git a/util/movement/core/src/movement.rs b/util/movement/core/src/movement.rs index 70de6f2..45fcac3 100644 --- a/util/movement/core/src/movement.rs +++ b/util/movement/core/src/movement.rs @@ -11,6 +11,7 @@ pub mod faucet; pub mod rest_api; use std::path::PathBuf; +use anyhow::Context; use faucet::{Faucet, ParseFaucet}; use movement_signer_loader::identifiers::SignerIdentifier; use mtma_types::movement::movement_config::Config as MovementConfig; @@ -372,10 +373,12 @@ impl Movement { ParseFaucet::new(known_faucet_listen_url, self.ping_faucet), ); - // get the prepared command for the movement task - let mut command = Command::new( - self.workspace - .prepared_command( + let inner_command = + if tokio::fs::metadata(self.workspace_path().join(".movement/config.json")) + .await + .is_ok() + { + self.workspace.command( "nix", [ "develop", @@ -387,8 +390,26 @@ impl Movement { ), ], ) - .map_err(|e| MovementError::Internal(e.into()))?, - ); + } else { + self.workspace + .prepared_command( + "nix", + [ + "develop", + "--command", + "bash", + "-c", + &format!( + "echo '' > .env && just movement-full-node docker-compose {overlays}" + ), + ], + ) + .context("failed to prepare movement command") + .map_err(|e| MovementError::Internal(e.into()))? + }; + + // get the prepared command for the movement task + let mut command = Command::new(inner_command); info!( "Writing movement config to {:?}", @@ -396,6 +417,7 @@ impl Movement { ); let container_config = self .container_movement_config() + .context("failed to get container movement config") .map_err(|e| MovementError::Internal(e.into()))?; // Write the [MovementConfig] to the workspace path at {workspace_path}/.movement/config.json // Use tokio::fs::write to write the config to the file. @@ -413,13 +435,16 @@ impl Movement { tokio::fs::write( &config_path, serde_json::to_string(&container_config) + .context("failed to serialize movement config") .map_err(|e| MovementError::Internal(e.into()))?, ) .await + .context("failed to write movement config") .map_err(|e| MovementError::Internal(e.into()))?; // Set the permissions of the config file to 777 tokio::fs::set_permissions(&config_path, Permissions::from_mode(0o777)) .await + .context("failed to set permissions on movement config") .map_err(|e| MovementError::Internal(e.into()))?; info!("Wrote movement config"); @@ -429,6 +454,7 @@ impl Movement { Pipe::STDOUT, rest_api_fulfiller.sender().map_err(|e| MovementError::Internal(e.into()))?, ) + .context("failed to pipe command output to rest api fulfiller") .map_err(|e| MovementError::Internal(e.into()))?; // pipe command output to the faucet fulfiller @@ -437,6 +463,7 @@ impl Movement { Pipe::STDOUT, faucet_fulfiller.sender().map_err(|e| MovementError::Internal(e.into()))?, ) + .context("failed to pipe command output to faucet fulfiller") .map_err(|e| MovementError::Internal(e.into()))?; // start the rest_api_fulfiller @@ -492,6 +519,13 @@ impl Movement { .chain .maptos_private_key_signer_identifier } + + /// Resets the states of the [Movement] (forces them to be filled again). + pub async fn reset_states(&self) -> Result<(), MovementError> { + self.rest_api.write().reset().await; + self.faucet.write().reset().await; + Ok(()) + } } impl Drop for Movement {