diff --git a/Cargo.lock b/Cargo.lock index 26a634ea94..814f16cf48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5599,6 +5599,7 @@ dependencies = [ "opentelemetry_sdk", "rand_chacha 0.3.1", "rayon", + "reqwest 0.11.27", "thiserror 1.0.69", "tikv-jemallocator", "tokio", diff --git a/monad-node-config/src/lib.rs b/monad-node-config/src/lib.rs index 93500283c1..0e0237925e 100644 --- a/monad-node-config/src/lib.rs +++ b/monad-node-config/src/lib.rs @@ -89,4 +89,7 @@ pub type ForkpointConfig = monad_consensus_types::checkpoint::Checkpoint< ExecutionProtocolType, >; #[cfg(feature = "crypto")] +pub type ValidatorsConfigType = + monad_consensus_types::validator_data::ValidatorsConfig; +#[cfg(feature = "crypto")] pub type MonadNodeConfig = NodeConfig; diff --git a/monad-node/Cargo.toml b/monad-node/Cargo.toml index db25d1b859..08b821ecbe 100644 --- a/monad-node/Cargo.toml +++ b/monad-node/Cargo.toml @@ -51,6 +51,7 @@ opentelemetry-otlp = { workspace = true, features = ["metrics", "grpc-tonic"] } opentelemetry-semantic-conventions = { workspace = true } rand_chacha = { workspace = true } rayon = { workspace = true } +reqwest = { workspace = true, features = ["blocking"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "signal"] } toml = { workspace = true } diff --git a/monad-node/src/main.rs b/monad-node/src/main.rs index 142acf9c51..87ccd6b600 100644 --- a/monad-node/src/main.rs +++ b/monad-node/src/main.rs @@ -22,18 +22,14 @@ use std::{ time::{Duration, Instant}, }; -use agent::AgentBuilder; use alloy_rlp::{Decodable, Encodable}; use chrono::Utc; use clap::CommandFactory; use futures_util::{FutureExt, StreamExt}; use monad_chain_config::ChainConfig; use monad_consensus_state::ConsensusConfig; -use monad_consensus_types::{ - metrics::Metrics, - validator_data::{ValidatorSetDataWithEpoch, ValidatorsConfig}, -}; -use monad_control_panel::{ipc::ControlPanelIpcReceiver, TracingReload}; +use monad_consensus_types::{metrics::Metrics, validator_data::ValidatorSetDataWithEpoch}; +use monad_control_panel::ipc::ControlPanelIpcReceiver; use monad_crypto::{ certificate_signature::{ CertificateKeyPair, CertificateSignaturePubKey, CertificateSignatureRecoverable, PubKey, @@ -79,12 +75,6 @@ use opentelemetry_otlp::{MetricExporter, WithExportConfig}; use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng}; use tokio::signal::unix::{signal, SignalKind}; use tracing::{error, event, info, warn, Instrument, Level}; -use tracing_manytrace::{ManytraceLayer, TracingExtension}; -use tracing_subscriber::{ - fmt::{format::FmtSpan, Layer as FmtLayer}, - layer::SubscriberExt, - Layer, -}; use self::{cli::Cli, error::NodeSetupError, state::NodeState}; @@ -124,9 +114,6 @@ fn main() { .map_err(Into::into) .unwrap_or_else(|e: NodeSetupError| cmd.error(e.kind(), e).exit()); - let (reload_handle, _agent) = setup_tracing(&node_state) - .unwrap_or_else(|e: NodeSetupError| cmd.error(e.kind(), e).exit()); - drop(cmd); MONAD_NODE_VERSION.map(|v| info!("starting monad-bft with version {}", v)); @@ -149,70 +136,14 @@ fn main() { }); } - if let Err(e) = runtime.block_on(run(node_state, reload_handle)) { + if let Err(e) = runtime.block_on(run(node_state)) { tracing::error!("monad consensus node crashed: {:?}", e); } } -fn setup_tracing( - node_state: &NodeState, -) -> Result<(Box, Option), NodeSetupError> { - if let Some(socket_path) = &node_state.manytrace_socket { - let extension = std::sync::Arc::new(TracingExtension::new()); - let agent = AgentBuilder::new(socket_path.clone()) - .register_tracing(Box::new((*extension).clone())) - .build() - .map_err(|e| NodeSetupError::Custom { - kind: clap::error::ErrorKind::Io, - msg: format!("failed to build manytrace agent: {}", e), - })?; - let (filter, reload_handle) = tracing_subscriber::reload::Layer::new( - tracing_subscriber::EnvFilter::from_default_env(), - ); - let subscriber = tracing_subscriber::Registry::default() - .with(ManytraceLayer::new(extension)) - .with( - FmtLayer::default() - .json() - .with_span_events(FmtSpan::NONE) - .with_current_span(false) - .with_span_list(false) - .with_writer(std::io::stdout) - .with_ansi(false) - .with_filter(filter), - ); - - tracing::subscriber::set_global_default(subscriber)?; - info!("manytrace tracing enabled"); - Ok((Box::new(reload_handle), Some(agent))) - } else { - let (filter, reload_handle) = tracing_subscriber::reload::Layer::new( - tracing_subscriber::EnvFilter::from_default_env(), - ); - - let subscriber = tracing_subscriber::Registry::default().with(filter).with( - FmtLayer::default() - .json() - .with_span_events(FmtSpan::NONE) - .with_current_span(false) - .with_span_list(false) - .with_writer(std::io::stdout) - .with_ansi(false), - ); - - tracing::subscriber::set_global_default(subscriber)?; - Ok((Box::new(reload_handle), None)) - } -} - -async fn run(node_state: NodeState, reload_handle: Box) -> Result<(), ()> { - let locked_epoch_validators = ValidatorsConfig::read_from_path(&node_state.validators_path) - .unwrap_or_else(|err| { - panic!( - "failed to read/parse validators_path={:?}, err={:?}", - &node_state.validators_path, err - ) - }) +async fn run(node_state: NodeState) -> Result<(), ()> { + let locked_epoch_validators = node_state + .validators_config .get_locked_validator_sets(&node_state.forkpoint_config); let current_epoch = node_state @@ -341,7 +272,7 @@ async fn run(node_state: NodeState, reload_handle: Box) -> Re .expect("txpool ipc succeeds"), control_panel: ControlPanelIpcReceiver::new( node_state.control_panel_ipc_path, - reload_handle, + node_state.reload_handle, 1000, ) .expect("uds bind failed"), @@ -397,8 +328,8 @@ async fn run(node_state: NodeState, reload_handle: Box) -> Re key: node_state.secp256k1_identity, certkey: node_state.bls12_381_identity, beneficiary: node_state.node_config.beneficiary.into(), - locked_epoch_validators, forkpoint: node_state.forkpoint_config.into(), + locked_epoch_validators, block_sync_override_peers, consensus_config: ConsensusConfig { execution_delay: SeqNum(EXECUTION_DELAY), diff --git a/monad-node/src/state.rs b/monad-node/src/state.rs index da329c3888..f86c856cb8 100644 --- a/monad-node/src/state.rs +++ b/monad-node/src/state.rs @@ -14,25 +14,40 @@ // along with this program. If not, see . use std::{ + env, path::{Path, PathBuf}, + str::FromStr, time::Duration, }; +use agent::AgentBuilder; use clap::{error::ErrorKind, FromArgMatches}; use monad_bls::BlsKeyPair; use monad_chain_config::MonadChainConfig; +use monad_control_panel::TracingReload; use monad_keystore::keystore::Keystore; -use monad_node_config::{ForkpointConfig, MonadNodeConfig}; +use monad_node_config::{ForkpointConfig, MonadNodeConfig, ValidatorsConfigType}; use monad_secp::KeyPair; -use tracing::info; +use monad_types::Round; +use reqwest::{blocking::Client, Url}; +use tracing::{info, warn}; +use tracing_manytrace::{ManytraceLayer, TracingExtension}; +use tracing_subscriber::{ + fmt::{format::FmtSpan, Layer as FmtLayer}, + layer::SubscriberExt, + Layer, +}; use crate::{cli::Cli, error::NodeSetupError}; +const REMOTE_FORKPOINT_URL_ENV: &str = "REMOTE_FORKPOINT_URL"; +const REMOTE_VALIDATORS_URL_ENV: &str = "REMOTE_VALIDATORS_URL"; + pub struct NodeState { pub node_config: MonadNodeConfig, pub node_config_path: PathBuf, pub forkpoint_config: ForkpointConfig, - pub validators_path: PathBuf, + pub validators_config: ValidatorsConfigType, pub chain_config: MonadChainConfig, pub secp256k1_identity: KeyPair, @@ -40,6 +55,7 @@ pub struct NodeState { pub bls12_381_identity: BlsKeyPair, pub forkpoint_path: PathBuf, + pub validators_path: PathBuf, pub wal_path: PathBuf, pub ledger_path: PathBuf, pub mempool_ipc_path: PathBuf, @@ -50,7 +66,7 @@ pub struct NodeState { pub otel_endpoint_interval: Option<(String, Duration)>, pub pprof: String, - pub manytrace_socket: Option, + pub reload_handle: Box, } impl NodeState { @@ -60,7 +76,7 @@ impl NodeState { secp_identity, node_config: node_config_path, forkpoint_config: forkpoint_config_path, - validators_path, + validators_path: validators_config_path, devnet_chain_config_override: maybe_devnet_chain_config_override_path, wal_path, ledger_path, @@ -76,6 +92,8 @@ impl NodeState { manytrace_socket, } = Cli::from_arg_matches_mut(&mut cmd.get_matches_mut())?; + let (reload_handle, _agent) = NodeState::setup_tracing(manytrace_socket)?; + let keystore_password = keystore_password.as_deref().unwrap_or(""); let secp_key = load_secp256k1_keypair(&secp_identity, keystore_password)?; @@ -101,8 +119,10 @@ impl NodeState { let node_config: MonadNodeConfig = toml::from_str(&std::fs::read_to_string(&node_config_path)?)?; - let forkpoint_config: ForkpointConfig = - toml::from_str(&std::fs::read_to_string(&forkpoint_config_path)?)?; + + let (forkpoint_config, validators_config) = + get_latest_configs(&forkpoint_config_path, &validators_config_path)?; + let devnet_chain_config_override = if let Some(devnet_override_path) = maybe_devnet_chain_config_override_path { Some(toml::from_str(&std::fs::read_to_string( @@ -141,7 +161,7 @@ impl NodeState { node_config, node_config_path, forkpoint_config, - validators_path, + validators_config, chain_config, secp256k1_identity: secp_key, @@ -149,6 +169,7 @@ impl NodeState { bls12_381_identity: bls_key, forkpoint_path: forkpoint_config_path, + validators_path: validators_config_path, wal_path, ledger_path, triedb_path, @@ -159,9 +180,171 @@ impl NodeState { otel_endpoint_interval, pprof, - manytrace_socket, + reload_handle, }) } + + fn setup_tracing( + manytrace_socket: Option, + ) -> Result<(Box, Option), NodeSetupError> { + if let Some(socket_path) = manytrace_socket { + let extension = std::sync::Arc::new(TracingExtension::new()); + let agent = AgentBuilder::new(socket_path) + .register_tracing(Box::new((*extension).clone())) + .build() + .map_err(|e| NodeSetupError::Custom { + kind: clap::error::ErrorKind::Io, + msg: format!("failed to build manytrace agent: {}", e), + })?; + let (filter, reload_handle) = tracing_subscriber::reload::Layer::new( + tracing_subscriber::EnvFilter::from_default_env(), + ); + let subscriber = tracing_subscriber::Registry::default() + .with(ManytraceLayer::new(extension)) + .with( + FmtLayer::default() + .json() + .with_span_events(FmtSpan::NONE) + .with_current_span(false) + .with_span_list(false) + .with_writer(std::io::stdout) + .with_ansi(false) + .with_filter(filter), + ); + + tracing::subscriber::set_global_default(subscriber)?; + info!("manytrace tracing enabled"); + Ok((Box::new(reload_handle), Some(agent))) + } else { + let (filter, reload_handle) = tracing_subscriber::reload::Layer::new( + tracing_subscriber::EnvFilter::from_default_env(), + ); + + let subscriber = tracing_subscriber::Registry::default().with(filter).with( + FmtLayer::default() + .json() + .with_span_events(FmtSpan::NONE) + .with_current_span(false) + .with_span_list(false) + .with_writer(std::io::stdout) + .with_ansi(false), + ); + + tracing::subscriber::set_global_default(subscriber)?; + Ok((Box::new(reload_handle), None)) + } + } +} + +fn fetch_local_configs( + forkpoint_config_path: &Path, + validators_config_path: &Path, +) -> Result<(ForkpointConfig, ValidatorsConfigType), String> { + let local_forkpoint_config: ForkpointConfig = toml::from_str( + &std::fs::read_to_string(forkpoint_config_path) + .map_err(|_| "failed to read local forkpoint.toml file".to_owned())?, + ) + .map_err(|err| err.to_string())?; + let local_validators_config = ValidatorsConfigType::read_from_path(validators_config_path) + .map_err(|_| "failed to read local validators.toml file".to_owned())?; + + Ok((local_forkpoint_config, local_validators_config)) +} + +fn fetch_remote_configs() -> Result<(ForkpointConfig, ValidatorsConfigType), String> { + let forkpoint_url_str = env::var(REMOTE_FORKPOINT_URL_ENV) + .map_err(|_| format!("{REMOTE_FORKPOINT_URL_ENV} env variable unset"))?; + let remote_forkpoint_url = Url::from_str(&forkpoint_url_str) + .map_err(|err| format!("failed to parse remote forkpoint url: {err}"))?; + + let validators_url_str = env::var(REMOTE_VALIDATORS_URL_ENV) + .map_err(|_| format!("{REMOTE_VALIDATORS_URL_ENV} env variable unset"))?; + let remote_validators_url = Url::from_str(&validators_url_str) + .map_err(|err| format!("failed to parse remote validators url: {err}"))?; + + let client = Client::new(); + + let forkpoint_config_str = client + .get(remote_forkpoint_url) + .send() + .and_then(|forkpoint_response| forkpoint_response.error_for_status()) + .and_then(|valid_forkpoint_response| valid_forkpoint_response.text()) + .map_err(|err| format!("error fetching remote forkpoint config: {err}"))?; + let forkpoint_config = toml::from_str(&forkpoint_config_str) + .map_err(|err| format!("failed to parse remote forkpoint config: {err}"))?; + + let validators_config_str = client + .get(remote_validators_url) + .send() + .and_then(|validators_response| validators_response.error_for_status()) + .and_then(|valid_validators_response| valid_validators_response.text()) + .map_err(|err| format!("error fetching remote validators config: {err}"))?; + let validators_config = ValidatorsConfigType::read_from_str(&validators_config_str) + .map_err(|err| format!("failed to parse remote validators config: {err}"))?; + + Ok((forkpoint_config, validators_config)) +} + +fn get_latest_configs( + forkpoint_config_path: &Path, + validators_config_path: &Path, +) -> Result<(ForkpointConfig, ValidatorsConfigType), NodeSetupError> { + let local_configs = fetch_local_configs(forkpoint_config_path, validators_config_path); + let remote_configs = fetch_remote_configs(); + + if local_configs.is_err() && remote_configs.is_err() { + return Err(NodeSetupError::Custom { + kind: ErrorKind::MissingRequiredArgument, + msg: "failed to fetch local and remote configs".to_owned(), + }); + } + + match local_configs { + Ok((local_forkpoint_config, local_validators_config)) => { + match remote_configs { + Ok((remote_forkpoint_config, remote_validators_config)) => { + let local_forkpoint_round = local_forkpoint_config.high_certificate.round(); + let remote_forkpoint_round = remote_forkpoint_config.high_certificate.round(); + + // if remote config is more recent, use that over local config + if remote_forkpoint_round > local_forkpoint_round { + info!( + ?remote_forkpoint_round, + ?local_forkpoint_round, + "remote forkpoint newer than local forkpoint, using remote configs" + ); + + return Ok((remote_forkpoint_config, remote_validators_config)); + } + + if remote_forkpoint_round < local_forkpoint_round - Round(200) { + // warn user if remote configs are stale + warn!( + ?remote_forkpoint_round, + ?local_forkpoint_round, + "remote forkpoint 200 rounds older than local forkpoint" + ); + } + } + Err(fetch_err) => { + info!( + fetch_err, + "failed to fetch remote configs, using local forkpoint and validators config" + ); + } + } + + Ok((local_forkpoint_config, local_validators_config)) + } + Err(fetch_err) => { + info!( + fetch_err, + "failed to fetch local configs, using remote forkpoint and validators config" + ); + + Ok(remote_configs.unwrap()) + } + } } fn load_secp256k1_keypair(path: &Path, keystore_password: &str) -> Result {