diff --git a/Cargo.lock b/Cargo.lock index 813797a24..994e75986 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14257,6 +14257,34 @@ dependencies = [ "tracing", ] +[[package]] +name = "movement-da-replica-node" +version = "0.3.4" +dependencies = [ + "anyhow", + "async-stream", + "bcs 0.1.6 (git+https://github.com/movementlabsxyz/bcs.git?rev=bc16d2d39cabafaabd76173dd1b04b2aa170cf0c)", + "dot-movement", + "futures", + "godfig", + "hex", + "maptos-execution-util", + "movement-da-sequencer-client", + "movement-da-sequencer-config", + "movement-da-sequencer-node", + "movement-da-sequencer-proto", + "movement-signer", + "movement-signer-loader", + "movement-types", + "poem", + "prost 0.13.3", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", + "url", +] + [[package]] name = "movement-da-sequencer-client" version = "0.3.4" @@ -14291,11 +14319,13 @@ dependencies = [ "ed25519-dalek 2.1.1", "godfig", "hex", + "maptos-execution-util", "rand 0.7.3", "serde", "serde_derive", "serde_json", "tracing", + "url", ] [[package]] @@ -14317,6 +14347,7 @@ dependencies = [ "futures", "godfig", "hex", + "maptos-execution-util", "movement-da-sequencer-client", "movement-da-sequencer-config", "movement-da-sequencer-proto", @@ -14440,6 +14471,7 @@ dependencies = [ "movement-config", "movement-da-light-node-client", "movement-da-light-node-setup", + "movement-da-replica-node", "movement-da-sequencer-client", "movement-da-sequencer-config", "movement-da-sequencer-node", diff --git a/Cargo.toml b/Cargo.toml index 71e82c390..666194616 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ members = [ "protocol-units/da-sequencer/config", "protocol-units/da-sequencer/client", "protocol-units/da-sequencer/node" -] +, "protocol-units/da-sequencer/replica"] [workspace.package] version = "0.3.4" @@ -85,6 +85,7 @@ movement-da-sequencer-client = { path = "protocol-units/da-sequencer/client" } movement-da-sequencer-config = { path = "protocol-units/da-sequencer/config" } movement-da-sequencer-node = { path = "protocol-units/da-sequencer/node" } movement-da-sequencer-proto = { path = "protocol-units/da-sequencer/proto" } +movement-da-replica-node = { path = "protocol-units/da-sequencer/replica" } # framework releases maptos-framework-release-util = { path = "protocol-units/execution/maptos/framework/releases/util" } diff --git a/docker/compose/movement-full-node/docker-compose.da-replicat.yml b/docker/compose/movement-full-node/docker-compose.da-replicat.yml new file mode 100644 index 000000000..716c7152f --- /dev/null +++ b/docker/compose/movement-full-node/docker-compose.da-replicat.yml @@ -0,0 +1,21 @@ +services: + movement-da-replicat: + image: ghcr.io/movementlabsxyz/movement-full-node:${CONTAINER_REV} + container_name: movement-da-replicat + command: da replicat + environment: + - DOT_MOVEMENT_PATH=/.movement + - MOVEMENT_TIMING=info + - RUST_BACKTRACE=1 + volumes: + - ${DOT_MOVEMENT_PATH}:/.movement + ports: + - "30730:30730" + - "30931:30931" + healthcheck: + test: [ "CMD-SHELL", "echo true" ] + retries: 10 + interval: 10s + timeout: 5s + restart: on-failure:5 + diff --git a/docs/movement-node/run-fullnode/scripts/setup_migrate.sh b/docs/movement-node/run-fullnode/scripts/setup_migrate.sh index 0506b62f3..4b455e03f 100755 --- a/docs/movement-node/run-fullnode/scripts/setup_migrate.sh +++ b/docs/movement-node/run-fullnode/scripts/setup_migrate.sh @@ -1,7 +1,7 @@ #!/bin/bash -e export DOT_MOVEMENT_PATH=$HOME/.movement -export CONTAINER_REV="a349ae1" +export CONTAINER_REV="c42fbb8" export MAYBE_RUN_LOCAL="false" /usr/bin/docker compose --env-file movement/.env -f movement/docker/compose/movement-full-node/docker-compose.fullnode_setup.yml up --force-recreate diff --git a/networks/movement/movement-full-node/Cargo.toml b/networks/movement/movement-full-node/Cargo.toml index a86b421ad..a3963dbca 100644 --- a/networks/movement/movement-full-node/Cargo.toml +++ b/networks/movement/movement-full-node/Cargo.toml @@ -9,8 +9,6 @@ homepage = { workspace = true } publish = { workspace = true } rust-version = { workspace = true } -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] maptos-dof-execution = { workspace = true } prost = { workspace = true } @@ -60,6 +58,8 @@ movement-da-sequencer-client = { workspace = true } movement-da-sequencer-node = { workspace = true } movement-da-light-node-setup = { workspace = true } movement-da-sequencer-config = { workspace = true } +movement-da-replica-node = { workspace = true } + ed25519-dalek = { workspace = true } mcr-settlement-setup = { workspace = true } diff --git a/networks/movement/movement-full-node/src/da/mod.rs b/networks/movement/movement-full-node/src/da/mod.rs index 9ac698c22..0005e4993 100644 --- a/networks/movement/movement-full-node/src/da/mod.rs +++ b/networks/movement/movement-full-node/src/da/mod.rs @@ -1,4 +1,5 @@ mod read_block; +mod replicat; mod run; pub mod stream_blocks; @@ -10,6 +11,7 @@ pub enum Da { StreamBlocks(stream_blocks::StreamBlocks), Run(run::DaRun), ReadBlock(read_block::ReadBlock), + Replicat(replicat::DaReplicatRun), } impl Da { @@ -18,6 +20,7 @@ impl Da { Da::StreamBlocks(stream_blocks) => stream_blocks.execute().await, Da::Run(da) => da.execute().await, Da::ReadBlock(da) => da.execute().await, + Da::Replicat(replicat) => replicat.execute().await, } } } diff --git a/networks/movement/movement-full-node/src/da/replicat.rs b/networks/movement/movement-full-node/src/da/replicat.rs new file mode 100644 index 000000000..cfeca3f44 --- /dev/null +++ b/networks/movement/movement-full-node/src/da/replicat.rs @@ -0,0 +1,17 @@ +use crate::common_args::MovementArgs; +use clap::Parser; + +#[derive(Debug, Parser, Clone)] +#[clap(rename_all = "kebab-case", about = "Runs Da Sequencer.")] +pub struct DaReplicatRun { + #[clap(flatten)] + pub movement_args: MovementArgs, +} + +impl DaReplicatRun { + pub async fn execute(&self) -> Result<(), anyhow::Error> { + // get the config file + let dot_movement = self.movement_args.dot_movement()?; + movement_da_replica_node::start(dot_movement).await + } +} diff --git a/networks/movement/movement-full-node/src/node/partial.rs b/networks/movement/movement-full-node/src/node/partial.rs index e2e9b2c41..15fb3cbb2 100644 --- a/networks/movement/movement-full-node/src/node/partial.rs +++ b/networks/movement/movement-full-node/src/node/partial.rs @@ -1,4 +1,5 @@ -use crate::node::{da_db::DaDB, tasks}; +use crate::node::da_db::DaDB; +use crate::node::tasks; use maptos_dof_execution::MakeOptFinServices; use maptos_dof_execution::{v1::Executor, DynOptFinExecutor}; use maptos_opt_executor::executor::TxExecutionResult; diff --git a/networks/movement/movement-full-node/src/node/tasks/execute_settle.rs b/networks/movement/movement-full-node/src/node/tasks/execute_settle.rs index f3a6922e0..538dcdec0 100644 --- a/networks/movement/movement-full-node/src/node/tasks/execute_settle.rs +++ b/networks/movement/movement-full-node/src/node/tasks/execute_settle.rs @@ -86,7 +86,7 @@ where let mut da_client = GrpcDaSequencerClient::try_connect(&da_connection_url, stream_heartbeat_interval_sec) .await?; - // TODO manage alert_channel in the issue #1169 + let (mut blocks_from_da, mut alert_channel) = da_client .stream_read_from_height(StreamReadFromHeightRequest { height: synced_height }) .await diff --git a/networks/movement/movement-full-node/src/setup/da/exec.rs b/networks/movement/movement-full-node/src/setup/da/exec.rs index 3f99db9ec..3f7e804b4 100644 --- a/networks/movement/movement-full-node/src/setup/da/exec.rs +++ b/networks/movement/movement-full-node/src/setup/da/exec.rs @@ -15,7 +15,7 @@ pub async fn exec() -> Result<(), anyhow::Error> { }?; //Define da-sequencer config path. - let pathbuff = movement_da_sequencer_config::get_config_path(&dot_movement); + let pathbuff = DaSequencerConfig::get_config_path(&dot_movement); dot_movement.set_path(pathbuff); // get a matching godfig object let config_file = dot_movement.try_get_or_create_config_file().await?; diff --git a/networks/movement/movement-full-node/src/setup/mod.rs b/networks/movement/movement-full-node/src/setup/mod.rs index a7651841f..d671359db 100644 --- a/networks/movement/movement-full-node/src/setup/mod.rs +++ b/networks/movement/movement-full-node/src/setup/mod.rs @@ -1,6 +1,7 @@ pub mod all; pub mod da; pub mod full_node; +pub mod replicat; use clap::Subcommand; @@ -10,6 +11,7 @@ pub enum Setup { All(all::All), FullNode(full_node::FullNode), Da(da::Da), + Replicat(replicat::Replicat), } impl Setup { @@ -18,6 +20,7 @@ impl Setup { Setup::All(all) => all.execute().await, Setup::FullNode(full_node) => full_node.execute().await, Setup::Da(da) => da.execute().await, + Setup::Replicat(replicat) => replicat.execute().await, } } } diff --git a/networks/movement/movement-full-node/src/setup/replicat/exec.rs b/networks/movement/movement-full-node/src/setup/replicat/exec.rs new file mode 100644 index 000000000..d8d304518 --- /dev/null +++ b/networks/movement/movement-full-node/src/setup/replicat/exec.rs @@ -0,0 +1,35 @@ +use crate::setup::replicat::local; +use godfig::{backend::config_file::ConfigFile, Godfig}; +use movement_da_sequencer_config::DaReplicatConfig; +use tracing::info; + +pub async fn exec() -> Result<(), anyhow::Error> { + info!("Starting Movement Full Node Setup"); + + // get the config file + let mut dot_movement = dot_movement::DotMovement::try_from_env()?; + + //Define da-sequencer config path. + let pathbuff = DaReplicatConfig::get_config_path(&dot_movement); + dot_movement.set_path(pathbuff); + // get a matching godfig object + let config_file = dot_movement.try_get_or_create_config_file().await?; + let godfig: Godfig = + Godfig::new(ConfigFile::new(config_file), vec![]); + + // run a godfig transaction to update the file + godfig + .try_transaction(|config| async move { + let mut config = config.unwrap_or(DaReplicatConfig::default()); + let local = std::env::var_os("MAYBE_RUN_LOCAL").unwrap_or("false".into()); + if local == "true" { + local::setup_movement_replica_node(&dot_movement, &mut config).await?; + } + tracing::info!("Da Sequencer Config after local setup: {:?}", config); + + Ok(Some(config)) + }) + .await?; + + Ok(()) +} diff --git a/networks/movement/movement-full-node/src/setup/replicat/local.rs b/networks/movement/movement-full-node/src/setup/replicat/local.rs new file mode 100644 index 000000000..b34025fcf --- /dev/null +++ b/networks/movement/movement-full-node/src/setup/replicat/local.rs @@ -0,0 +1,47 @@ +use godfig::backend::config_file::ConfigFile; +use godfig::Godfig; +use movement_da_sequencer_config::DaReplicatConfig; +use movement_da_sequencer_node::whitelist::Whitelist; +use movement_signer::{cryptography::ed25519::Ed25519, Signing}; +use movement_signer_loader::{Load, LoadedSigner}; + +pub async fn setup_movement_replica_node( + replicat_dot_movement: &dot_movement::DotMovement, + da_replicat_config: &mut DaReplicatConfig, +) -> Result<(), anyhow::Error> { + //update whitelist with node public key. + // Load Maptos config + let maptos_config = { + let dot_movement = dot_movement::DotMovement::try_from_env()?; + let config_file = dot_movement.try_get_or_create_config_file().await?; + let godfig: Godfig = + Godfig::new(ConfigFile::new(config_file), vec!["maptos_config".to_string()]); + godfig.try_wait_for_ready().await + }?; + + let loader: LoadedSigner = + maptos_config.da_sequencer.batch_signer_identifier.load().await?; + + let verifying_key = + ed25519_dalek::VerifyingKey::from_bytes(&loader.public_key().await?.to_bytes())?; + + let dotmovement_path = replicat_dot_movement.get_path().to_path_buf(); + let whitelist_path = + dotmovement_path.join(&da_replicat_config.da_sequencer.whitelist_relative_path); + if whitelist_path.exists() { + std::fs::remove_file(&whitelist_path)?; + } + Whitelist::save(&whitelist_path, &[verifying_key])?; + + // Register the full node has main node for state propagation. + let pk_str = hex::encode(verifying_key.to_bytes()); + + da_replicat_config.da_sequencer.main_node_verifying_key = Some(pk_str); + + //set the same batch identifier as the fullnode + da_replicat_config.da_client.batch_signer_identifier = + maptos_config.da_sequencer.batch_signer_identifier; + + tracing::info!("Da Sequencer local setup done."); + Ok(()) +} diff --git a/networks/movement/movement-full-node/src/setup/replicat/mod.rs b/networks/movement/movement-full-node/src/setup/replicat/mod.rs new file mode 100644 index 000000000..e29ebfeee --- /dev/null +++ b/networks/movement/movement-full-node/src/setup/replicat/mod.rs @@ -0,0 +1,13 @@ +use clap::Parser; + +mod exec; +mod local; + +#[derive(Parser, Debug)] +pub struct Replicat; + +impl Replicat { + pub async fn execute(&self) -> Result<(), anyhow::Error> { + exec::exec().await + } +} diff --git a/process-compose/movement-full-node/process-compose.replicat.yml b/process-compose/movement-full-node/process-compose.replicat.yml new file mode 100644 index 000000000..d6486eb24 --- /dev/null +++ b/process-compose/movement-full-node/process-compose.replicat.yml @@ -0,0 +1,53 @@ +version: "3" + +processes: + setup: + environment: + - "MAYBE_RUN_LOCAL=true" + - "MOVEMENT_DA_SEQUENCER_GRPC_LISTEN_ADDRESS=0.0.0.0:30830" + command: | + RUST_BACKTRACE=1 movement-full-node setup all + depends_on: + build: + condition: process_completed_successfully + + da-replicat-setup: + environment: + - "MAYBE_RUN_LOCAL=true" + - "MAPTOS_DA_SEQUENCER_CONNECTION_URL=http://0.0.0.0:30830" + - "MOVEMENT_DA_HEALTHCHECK_PORT=30932" + command: | + RUST_BACKTRACE=1 movement-full-node setup replicat + depends_on: + setup: + condition: process_completed_successfully + + replicat-node: + command: | + RUST_BACKTRACE=1 movement-full-node da replicat + depends_on: + da-sequencer: + condition: process_healthy + da-replicat-setup: + condition: process_completed_successfully + readiness_probe: + initial_delay_seconds: 30 + exec: + command: curl http://0.0.0.0:30932/health + + movement-full-node: + command: | + RUST_BACKTRACE=1 movement-full-node run + depends_on: + setup: + condition: process_completed_successfully + da-sequencer: + condition: process_healthy + replicat-node: + condition: process_healthy + readiness_probe: + initial_delay_seconds: 10 + exec: + command: curl http://0.0.0.0:30731 + ports: + - "9464:9464" # Expose metrics endpoint diff --git a/protocol-units/da-sequencer/config/Cargo.toml b/protocol-units/da-sequencer/config/Cargo.toml index a32289c47..e468fc086 100644 --- a/protocol-units/da-sequencer/config/Cargo.toml +++ b/protocol-units/da-sequencer/config/Cargo.toml @@ -20,6 +20,9 @@ tracing = { workspace = true } hex = { workspace = true } dot-movement = { workspace = true } anyhow = { workspace = true } +url = { workspace = true , features = ["serde"] } + +maptos-execution-util = { workspace = true } [lints] workspace = true diff --git a/protocol-units/da-sequencer/config/src/lib.rs b/protocol-units/da-sequencer/config/src/lib.rs index 6568eeb81..e33caa23c 100644 --- a/protocol-units/da-sequencer/config/src/lib.rs +++ b/protocol-units/da-sequencer/config/src/lib.rs @@ -1,17 +1,11 @@ use ed25519_dalek::VerifyingKey; -use godfig::env_default; +use godfig::backend::config_file::ConfigFile; +use godfig::{env_default, Godfig}; use hex::FromHex; +use maptos_execution_util::config::da_sequencer::Config as ClientDaSequencerConfig; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; -pub const DA_SEQUENCER_DIR: &str = "da-sequencer"; - -pub fn get_config_path(dot_movement: &dot_movement::DotMovement) -> std::path::PathBuf { - let mut pathbuff = std::path::PathBuf::from(dot_movement.get_path()); - pathbuff.push(DA_SEQUENCER_DIR); - pathbuff -} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct DaSequencerConfig { #[serde(default = "default_grpc_listen_address")] @@ -37,6 +31,29 @@ pub struct DaSequencerConfig { } impl DaSequencerConfig { + pub const DA_SEQUENCER_DIR: &str = "da-sequencer"; + + pub fn get_config_path(dot_movement: &dot_movement::DotMovement) -> std::path::PathBuf { + let mut pathbuff = std::path::PathBuf::from(dot_movement.get_path()); + pathbuff.push(DaSequencerConfig::DA_SEQUENCER_DIR); + pathbuff + } + pub async fn try_from_env( + dot_movement: &mut dot_movement::DotMovement, + ) -> Result { + let pathbuff = DaSequencerConfig::get_config_path(&dot_movement); + tracing::info!("Start Da Sequencer with config file in {pathbuff:?}."); + dot_movement.set_path(pathbuff); + + let config_file = dot_movement.try_get_or_create_config_file().await?; + + // Get a matching godfig object + let godfig: Godfig = + Godfig::new(ConfigFile::new(config_file), vec![]); + let config: DaSequencerConfig = godfig.try_wait_for_ready().await?; + Ok(config) + } + pub fn get_main_node_verifying_key(&self) -> Result, anyhow::Error> { self.main_node_verifying_key .clone() @@ -54,6 +71,63 @@ impl DaSequencerConfig { } } +impl Default for DaSequencerConfig { + fn default() -> Self { + Self { + grpc_listen_address: default_grpc_listen_address(), + block_production_interval_millisec: default_block_production_interval_millisec(), + stream_heartbeat_interval_sec: default_stream_heartbeat_interval_sec(), + whitelist_relative_path: default_whitelist_relative_path(), + db_storage_relative_path: default_db_storage_relative_path(), + main_node_verifying_key: None, + healthcheck_bind_port: default_healthcheck_bind_port(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DaReplicatConfig { + #[serde(default)] + pub da_sequencer: DaSequencerConfig, + + #[serde(default)] + pub da_client: ClientDaSequencerConfig, +} + +impl DaReplicatConfig { + pub const DA_REPLICAT_DIR: &str = "da-replicat"; + pub fn get_config_path(dot_movement: &dot_movement::DotMovement) -> std::path::PathBuf { + let mut pathbuff = std::path::PathBuf::from(dot_movement.get_path()); + pathbuff.push(DaReplicatConfig::DA_REPLICAT_DIR); + pathbuff + } + + pub async fn try_from_env( + dot_movement: &mut dot_movement::DotMovement, + ) -> Result { + let pathbuff = DaReplicatConfig::get_config_path(&dot_movement); + tracing::info!("Start Da Sequencer with config file in {pathbuff:?}."); + dot_movement.set_path(pathbuff); + + let config_file = dot_movement.try_get_or_create_config_file().await?; + + // Get a matching godfig object + let godfig: Godfig = + Godfig::new(ConfigFile::new(config_file), vec![]); + let config: DaReplicatConfig = godfig.try_wait_for_ready().await?; + Ok(config) + } +} + +impl Default for DaReplicatConfig { + fn default() -> Self { + Self { + da_sequencer: DaSequencerConfig::default(), + da_client: ClientDaSequencerConfig::default(), + } + } +} + env_default!( default_grpc_listen_address, "MOVEMENT_DA_SEQUENCER_GRPC_LISTEN_ADDRESS", @@ -90,16 +164,12 @@ env_default!( "da-store".to_string() ); -impl Default for DaSequencerConfig { - fn default() -> Self { - Self { - grpc_listen_address: default_grpc_listen_address(), - block_production_interval_millisec: default_block_production_interval_millisec(), - stream_heartbeat_interval_sec: default_stream_heartbeat_interval_sec(), - whitelist_relative_path: default_whitelist_relative_path(), - db_storage_relative_path: default_db_storage_relative_path(), - main_node_verifying_key: None, - healthcheck_bind_port: default_healthcheck_bind_port(), - } - } -} +// The default Da Sequencer connection url +// env_default!( +// default_da_sequencer_connection_url, +// "MAPTOS_REPLICAT_DA_SEQUENCER_CONNECTION_URL", +// Url, +// "https://da-sequencer.mainnet.movementinfra.xyz" +// .parse() +// .expect("Bad da sequencer connection url.") +// ); diff --git a/protocol-units/da-sequencer/node/Cargo.toml b/protocol-units/da-sequencer/node/Cargo.toml index 1d9db4f0c..2861df28f 100644 --- a/protocol-units/da-sequencer/node/Cargo.toml +++ b/protocol-units/da-sequencer/node/Cargo.toml @@ -44,6 +44,8 @@ movement-da-sequencer-client = { workspace = true } movement-da-sequencer-config = { workspace = true } movement-da-sequencer-proto = { workspace = true, features = ["server"] } movement-types = { workspace = true } +maptos-execution-util = { workspace = true } + [dev-dependencies] tempfile = { workspace = true } diff --git a/protocol-units/da-sequencer/node/src/block.rs b/protocol-units/da-sequencer/node/src/block.rs index 0a0be4589..c7871df80 100644 --- a/protocol-units/da-sequencer/node/src/block.rs +++ b/protocol-units/da-sequencer/node/src/block.rs @@ -1,9 +1,9 @@ +use crate::error::DaSequencerError; +use movement_da_sequencer_proto::MainNodeState; use movement_types::block::{self, Block, Transactions}; use serde::{Deserialize, Serialize}; use std::ops::Add; -use crate::error::DaSequencerError; - // TODO: use a sensible value for the max sequencer block size pub const MAX_SEQUENCER_BLOCK_SIZE: u64 = 100_000_000; // 100 MB @@ -20,6 +20,16 @@ impl NodeState { } } +impl From<&MainNodeState> for NodeState { + fn from(main_node_state: &MainNodeState) -> Self { + NodeState { + block_height: main_node_state.block_height, + ledger_timestamp: main_node_state.ledger_timestamp, + ledger_version: main_node_state.ledger_version, + } + } +} + #[derive( Serialize, Deserialize, Clone, Copy, Default, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, )] @@ -64,10 +74,8 @@ pub struct SequencerBlock { } impl SequencerBlock { - /// Try to construct a SequencerBlock, but fail if it exceeds the max encoded size. - pub fn try_new(height: BlockHeight, block: Block) -> Result { - let sb = SequencerBlock { height, block }; - Ok(sb) + pub fn new(height: BlockHeight, block: Block) -> Self { + SequencerBlock { height, block } } pub fn id(&self) -> block::Id { diff --git a/protocol-units/da-sequencer/node/src/celestia/mod.rs b/protocol-units/da-sequencer/node/src/celestia/mod.rs index 5de75bab3..5a03c029c 100644 --- a/protocol-units/da-sequencer/node/src/celestia/mod.rs +++ b/protocol-units/da-sequencer/node/src/celestia/mod.rs @@ -447,7 +447,7 @@ mod tests { let mut state = self.0.write().await; state.add_call(BlockProviderCalls::RequestBlockAtHeight(height)); let block = state.get_at_height(height.into()).unwrap(); - Ok(SequencerBlock::try_new(height, block)?) + Ok(SequencerBlock::new(height, block)) } async fn request_block_with_id( @@ -457,7 +457,7 @@ mod tests { let mut state = self.0.write().await; state.add_call(BlockProviderCalls::RequestBlockForId(id)); let (height, block) = state.get_for_id(&id).unwrap(); - Ok(SequencerBlock::try_new(BlockHeight::from(height), block)?) + Ok(SequencerBlock::new(BlockHeight::from(height), block)) } } diff --git a/protocol-units/da-sequencer/node/src/lib.rs b/protocol-units/da-sequencer/node/src/lib.rs index 696e433d1..e07ebfb59 100644 --- a/protocol-units/da-sequencer/node/src/lib.rs +++ b/protocol-units/da-sequencer/node/src/lib.rs @@ -5,16 +5,13 @@ use crate::error::DaSequencerError; use crate::server::run_server; use crate::server::GrpcRequests; use crate::server::ProducedData; -use crate::storage::DaSequencerStorage; -use crate::storage::Storage; +use crate::storage::{DaSequencerStorage, Storage}; use crate::whitelist::Whitelist; use anyhow::Context; use futures::future::Either; use futures::stream::FuturesUnordered; -use godfig::{backend::config_file::ConfigFile, Godfig}; use movement_da_sequencer_config::DaSequencerConfig; -use tokio::signal::unix::signal; -use tokio::signal::unix::SignalKind; +use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio_stream::StreamExt; @@ -23,7 +20,7 @@ pub mod batch; pub mod block; pub mod celestia; pub mod error; -mod healthcheck; +pub mod healthcheck; pub mod server; pub mod storage; #[cfg(test)] @@ -33,16 +30,7 @@ pub mod whitelist; pub const GRPC_REQUEST_CHANNEL_SIZE: usize = 1000; pub async fn start(mut dot_movement: dot_movement::DotMovement) -> Result<(), anyhow::Error> { - let pathbuff = movement_da_sequencer_config::get_config_path(&dot_movement); - tracing::info!("Start Da Sequencer with config file in {pathbuff:?}."); - dot_movement.set_path(pathbuff); - - let config_file = dot_movement.try_get_or_create_config_file().await?; - - // Get a matching godfig object - let godfig: Godfig = - Godfig::new(ConfigFile::new(config_file), vec![]); - let da_sequencer_config: DaSequencerConfig = godfig.try_wait_for_ready().await?; + let da_sequencer_config = DaSequencerConfig::try_from_env(&mut dot_movement).await?; let dotmovement_path = dot_movement.get_path().to_path_buf(); diff --git a/protocol-units/da-sequencer/node/src/storage/mod.rs b/protocol-units/da-sequencer/node/src/storage/mod.rs index e8e4c780c..f115c03fa 100644 --- a/protocol-units/da-sequencer/node/src/storage/mod.rs +++ b/protocol-units/da-sequencer/node/src/storage/mod.rs @@ -248,7 +248,7 @@ impl DaSequencerStorage for Storage { let tx_set: BTreeSet<_> = selected_txs.into_iter().collect(); let block = Block::new(BlockMetadata::default(), parent_id, tx_set); - let sequencer_block = SequencerBlock::try_new(height, block)?; + let sequencer_block = SequencerBlock::new(height, block); tracing::info!( "Producing new block: id:{} height:{} nb Tx:{}", sequencer_block.id(), @@ -502,7 +502,7 @@ mod tests { // Construct a dummy block to save let height = BlockHeight(1); let block = Block::new(BlockMetadata::default(), block::Id::default(), [tx.clone()].into()); - let sequencer_block = SequencerBlock::try_new(height, block).expect("valid block"); + let sequencer_block = SequencerBlock::new(height, block); // Save the block and remove the pending tx storage @@ -541,7 +541,7 @@ mod tests { let block_height = BlockHeight(42); let dummy_block = Block::default(); - let sequencer_block = SequencerBlock::try_new(block_height, dummy_block.clone()).unwrap(); + let sequencer_block = SequencerBlock::new(block_height, dummy_block.clone()); let encoded_block = bcs::to_bytes(&sequencer_block).expect("failed to serialize SequencerBlock"); @@ -569,7 +569,7 @@ mod tests { let block_height = BlockHeight(99); let dummy_block = Block::default(); - let sequencer_block = SequencerBlock::try_new(block_height, dummy_block.clone()).unwrap(); + let sequencer_block = SequencerBlock::new(block_height, dummy_block.clone()); let id = sequencer_block.id(); let encoded_block = diff --git a/protocol-units/da-sequencer/node/src/tests/mock.rs b/protocol-units/da-sequencer/node/src/tests/mock.rs index f69f89894..6a54e6461 100644 --- a/protocol-units/da-sequencer/node/src/tests/mock.rs +++ b/protocol-units/da-sequencer/node/src/tests/mock.rs @@ -119,7 +119,7 @@ impl DaSequencerStorage for StorageMock { let block = Block::new(BlockMetadata::default(), inner.parent_block_id, tx_list); inner.parent_block_id = block.id(); inner.current_height += 1; - let sequencer_block = SequencerBlock::try_new(BlockHeight(inner.current_height), block)?; + let sequencer_block = SequencerBlock::new(BlockHeight(inner.current_height), block); inner.produced_blocks.push(sequencer_block.clone()); tracing::info!("Mock Storage produce block at height:{}", inner.current_height); Ok(Some(sequencer_block)) diff --git a/protocol-units/da-sequencer/replica/Cargo.toml b/protocol-units/da-sequencer/replica/Cargo.toml new file mode 100644 index 000000000..5b976fd50 --- /dev/null +++ b/protocol-units/da-sequencer/replica/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "movement-da-replica-node" +version.workspace = true +edition.workspace = true +license.workspace = true +authors.workspace = true +repository.workspace = true +homepage.workspace = true +publish.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow = { workspace = true } +hex = { workspace = true } +prost = { workspace = true } +bcs = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tokio-stream = { workspace = true } +async-stream = { workspace = true } +tracing = { workspace = true } +url = { workspace = true } +futures = { workspace = true } +dot-movement = { workspace = true } +godfig = { workspace = true } +poem = { workspace = true } + +movement-da-sequencer-client = { workspace = true } +movement-da-sequencer-node = { workspace = true } +movement-da-sequencer-config = { workspace = true } +movement-da-sequencer-proto = { workspace = true, features = ["server"] } +movement-types = { workspace = true } +movement-signer-loader = { workspace = true } +movement-signer = { workspace = true } +maptos-execution-util = { workspace = true } + +[lints] +workspace = true diff --git a/protocol-units/da-sequencer/replica/src/lib.rs b/protocol-units/da-sequencer/replica/src/lib.rs new file mode 100644 index 000000000..28a839e90 --- /dev/null +++ b/protocol-units/da-sequencer/replica/src/lib.rs @@ -0,0 +1,261 @@ +use anyhow::Context; +use futures::stream::FuturesUnordered; +use movement_da_sequencer_client::DaSequencerClient; +use movement_da_sequencer_client::GrpcDaSequencerClient; +use movement_da_sequencer_config::DaReplicatConfig; +use movement_da_sequencer_node::block::BlockHeight; +use movement_da_sequencer_node::block::NodeState; +use movement_da_sequencer_node::block::SequencerBlock; +use movement_da_sequencer_node::error::DaSequencerError; +use movement_da_sequencer_node::server::run_server; +use movement_da_sequencer_node::server::GrpcRequests; +use movement_da_sequencer_node::server::ProducedData; +use movement_da_sequencer_node::storage::{DaSequencerStorage, Storage}; +use movement_da_sequencer_node::whitelist::Whitelist; +use movement_da_sequencer_node::GRPC_REQUEST_CHANNEL_SIZE; +use movement_da_sequencer_proto::BatchWriteRequest; +use movement_da_sequencer_proto::StreamReadFromHeightRequest; +use movement_signer::cryptography::ed25519::Ed25519; +use movement_signer_loader::{Load, LoadedSigner}; +use movement_types::block::Block; +use tokio::select; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::mpsc; +use tokio_stream::StreamExt; + +pub async fn start(mut dot_movement: dot_movement::DotMovement) -> Result<(), anyhow::Error> { + // Signal management + let mut sigterm = signal(SignalKind::terminate()).context("can't register to SIGTERM.")?; + let mut sigint = signal(SignalKind::interrupt()).context("can't register to SIGKILL.")?; + let mut sigquit = signal(SignalKind::quit()).context("can't register to SIGKILL.")?; + + let da_sequencer_config = DaReplicatConfig::try_from_env(&mut dot_movement).await?; + + // Init block storage + let dotmovement_path = dot_movement.get_path().to_path_buf(); + let db_storage_path = + dotmovement_path.join(&da_sequencer_config.da_sequencer.db_storage_relative_path); + let storage = Storage::try_new(&db_storage_path)?; + + // Create da sequencer client to stream block + + //Connect to the main DA sequencer to get all missing block and produced one. + let mut da_client = + GrpcDaSequencerClient::try_connect(&da_sequencer_config.da_client.connection_url, 10) + .await?; + + let last_synced_height = storage.get_current_block_height()? + 1; + let (mut blocks_from_da, mut alert_channel) = da_client + .stream_read_from_height(StreamReadFromHeightRequest { height: last_synced_height.into() }) + .await + .map_err(|e| { + tracing::error!("Failed to stream blocks from DA: {:?}", e); + e + })?; + + //start grpc entry point + // Initialize whitelist + let whitelist_path = + dotmovement_path.join(&da_sequencer_config.da_sequencer.whitelist_relative_path); + let whitelist = Whitelist::from_file_and_spawn_reload_thread(whitelist_path)?; + + let (request_tx, mut request_rx) = mpsc::channel(GRPC_REQUEST_CHANNEL_SIZE); + // Start gprc server + let grpc_address = da_sequencer_config.da_sequencer.grpc_listen_address; + let verifying_key = da_sequencer_config.da_sequencer.get_main_node_verifying_key()?; + + let mut grpc_jh = tokio::spawn(async move { + run_server(grpc_address, request_tx, whitelist, verifying_key).await + }); + + // Load batch signer + let da_batch_signer = da_sequencer_config.da_client.batch_signer_identifier.clone(); + + // Start healthcheck entry point + let healthcheck_url = + format!("0.0.0.0:{}", da_sequencer_config.da_sequencer.healthcheck_bind_port); + let (rest_health_tx, mut rest_health_rx) = tokio::sync::mpsc::channel(10); + let rest_service = movement_da_sequencer_node::healthcheck::HealthCheckRest::new( + healthcheck_url, + rest_health_tx, + )?; + let rest_service_future = rest_service.run_service(); + let mut rest_jh = tokio::spawn(rest_service_future); + + // Some processing vars + let mut spawn_result_futures = FuturesUnordered::new(); + let mut connected_grpc_sender = vec![]; + let mut da_stream_heartbeat_interval = tokio::time::interval(tokio::time::Duration::from_secs( + da_sequencer_config.da_sequencer.stream_heartbeat_interval_sec, + )); + + loop { + select! { + next_block = blocks_from_da.next() => { + match next_block { + None => { + tracing::error!("Da stream return none, stream broken"); + break; + } + Some(res) => { + let da_block = res.context("failed to get next block from DA")?; + let da_block_height: BlockHeight = da_block.height.into(); + let node_state: Option = da_block.node_state.as_ref().map(|state| state.into()); + + tracing::info!("Receive block at height from DA: {:?}", da_block_height); + let block: Block = bcs::from_bytes(&da_block.data[..])?; + let sequencer_block = SequencerBlock::new(da_block_height, block); + //save the block + let start_jh = tokio::task::spawn_blocking({ + let storage = storage.clone(); + let sequencer_block = sequencer_block.clone(); + move || { + storage.save_block(&sequencer_block, None) + }}); + spawn_result_futures.push(start_jh); + + // Send the block to all registered full node + // For now send to the main loop because there are very few followers (<100). + tracing::info!(sender_len = %connected_grpc_sender.len(), block_height= %sequencer_block.height().0, "New block produced, sent to fullnodes."); + + stream_block_to_sender(&mut connected_grpc_sender, ProducedData::Block(sequencer_block, node_state.clone())).await; + } + } + } + // Manage grpc request. + Some(grpc_request) = request_rx.recv() => { + match grpc_request { + GrpcRequests::StartBlockStream(proposed_block_tx, curent_height_callback) => { + connected_grpc_sender.push(proposed_block_tx); + + // Send back the current height. + let start_jh = tokio::task::spawn_blocking({ + let storage = storage.clone(); + move || { + let current_height = storage.get_current_block_height()?; + let _ = curent_height_callback.send(current_height); + Ok::<(), DaSequencerError>(()) + }}); + spawn_result_futures.push(start_jh); + }, + GrpcRequests::GetBlockHeight(block_height, callback) => { + let get_block_jh = tokio::task::spawn_blocking({ + let storage = storage.clone(); + move || {storage.get_block_at_height(block_height)} + }); + tokio::spawn(async move { + let result = get_block_jh.await; + // Manage result. + let to_send = match result { + Err(err) => { + tracing::error!(error = %err, "spawn_blocking task failed."); + None + } + Ok(Err(err)) => { + tracing::error!(error = %err, "Storage get_block_at_height return an error."); + None + + } + Ok(Ok(block)) => block, + }; + + let _ = callback.send(to_send); + }); + }, + GrpcRequests::WriteBatch(batch) => { + // All batch verification has been done, propagate the batch to the da-sequencer with replicat key + if !batch.data().0.is_empty() { + // Build batch and submit request. + tracing::info!("Propagate new batch with {} txs.", batch.data().0.len()); + let loader: LoadedSigner = da_batch_signer.load().await?; + + //send the batch in a separate task to avoid to slow the loop. + let handle = tokio::spawn({ + let mut client = da_client.clone(); + async move { + let batch_bytes = bcs::to_bytes(&batch.data().0).expect("Serialization failed"); + let encoded = + movement_da_sequencer_client::sign_and_encode_batch(batch_bytes, &loader) + .await + .unwrap(); + client.batch_write(BatchWriteRequest { data: encoded }).await.map_err(|status| { + tracing::warn!("Send Batch to Da failed because: {status:?}"); + DaSequencerError::SendFailure} + )?; + Ok(()) + } + }); + spawn_result_futures.push(handle); + } + + }, + GrpcRequests::SendState(_state) => (), // can't send node state with replicat. + + } + } + // Every tick will produce a heartbeat. + _ = da_stream_heartbeat_interval.tick() => { + tracing::info!(sender_len = %connected_grpc_sender.len(), "Produced a heartbeat, sent to fullnodes"); + stream_block_to_sender(&mut connected_grpc_sender, ProducedData::HeartBeat).await; + + } + // Manage health check request. + Some(oneshot_tx) = rest_health_rx.recv() => { + //Basic monitoring, always true if the loop run. + if let Err(err) = oneshot_tx.send(true){ + tracing::warn!("Heal check oneshot channel closed abnormally :{err:?}"); + } + } + _ = alert_channel.recv() => { + tracing::error!("Da client stream channel timeout because it's idle. Exit"); + break; + } + _ = sigterm.recv() => { + tracing::error!("Reveived sigterm, exiting"); + break; + }, + _ = sigint.recv() => { + tracing::error!("Reveived sigint, exiting"); + break; + } + _ = sigquit.recv() => { + tracing::error!("Reveived sigquit, exiting"); + break; + } + // Manage futures result. + Some(Ok(res)) = spawn_result_futures.next() => { + // just log for now, add more logic later. + if let Err(err) = res { + tracing::error!(error = %err, "Error during future execution."); + } + } + res = &mut grpc_jh => { + tracing::error!("Grpc server exit because :{res:?}"); + break; + } + res = &mut rest_jh => { + tracing::error!("Health check server exit because :{res:?}"); + break; + } + + else => break, + } + } + anyhow::bail!("Block execution loop break. Node need to be restarted.") +} + +async fn stream_block_to_sender( + senders: &mut Vec>, + data: ProducedData, +) { + let mut new_sender = vec![]; + for sender in senders.drain(..) { + // Remove the sender in error because it means the client was disconnected. + if let Err(err) = sender.send(data.clone()) { + tracing::warn!("Failed to send block to grpc client. Client disconnected. remove connection :{err}"); + } else { + new_sender.push(sender); + } + } + *senders = new_sender; +} diff --git a/protocol-units/execution/maptos/util/src/config/da_sequencer.rs b/protocol-units/execution/maptos/util/src/config/da_sequencer.rs index a8b8a2647..49623e625 100644 --- a/protocol-units/execution/maptos/util/src/config/da_sequencer.rs +++ b/protocol-units/execution/maptos/util/src/config/da_sequencer.rs @@ -8,7 +8,7 @@ use url::Url; /// Configuration for the DA Sequencer. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Config { - /// The number of milliseconds a sequence number is valid for. + /// Url to connect ot the da-sequencer. #[serde(default = "default_da_sequencer_connection_url")] pub connection_url: Url, diff --git a/scripts/services/movement-full-node/build b/scripts/services/movement-full-node/build index 40a1fb063..c22d0ac6a 100755 --- a/scripts/services/movement-full-node/build +++ b/scripts/services/movement-full-node/build @@ -7,14 +7,6 @@ else CARGO_PROFILE_FLAGS="" fi -#echo "Building movement-celestia-da-light-node..." -#cargo build $CARGO_PROFILE_FLAGS -p movement-celestia-da-light-node --features "sequencer" -#echo "Built movement-celestia-da-light-node!" - -echo "Building movement-celestia-* runners..." -cargo build $CARGO_PROFILE_FLAGS -p movement-celestia-da-light-node-runners -echo "Built movement-celestia-* runners!" - echo "Building movement-full-node..." cargo build $CARGO_PROFILE_FLAGS -p movement-full-node echo "Built movement-full-node!" @@ -23,7 +15,7 @@ echo "Building movement-faucet-service..." cargo build $CARGO_PROFILE_FLAGS -p movement-faucet-service echo "Built movement-faucet-service!" -echo "Building da-sequencer-node..." -cargo build $CARGO_PROFILE_FLAGS -p movement-da-sequencer-node -echo "Built da-sequencer-node" +echo "Building end 2 end test ..." +cargo build $CARGO_PROFILE_FLAGS -p movement-client +echo "Building end 2 end test!"