From 1df4480f0ff96ad25261fee0f085ec6d69d7fff3 Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Sun, 3 Nov 2024 17:24:08 +0400 Subject: [PATCH 01/10] chore(testing): make separate module for server --- node/testing/src/{server.rs => server/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename node/testing/src/{server.rs => server/mod.rs} (100%) diff --git a/node/testing/src/server.rs b/node/testing/src/server/mod.rs similarity index 100% rename from node/testing/src/server.rs rename to node/testing/src/server/mod.rs From 135493ecd07da4a86fc92be688e080b8da69a12a Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Wed, 6 Nov 2024 14:02:56 +0400 Subject: [PATCH 02/10] fix(testing/server): requests hanging --- Cargo.lock | 154 ++++++++++++++++++++++++--------- node/testing/Cargo.toml | 4 +- node/testing/src/main.rs | 2 +- node/testing/src/server/mod.rs | 11 ++- 4 files changed, 122 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a9c88bc792..7140b81fa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -535,7 +535,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" dependencies = [ - "http", + "http 0.2.9", "log", "url", ] @@ -571,18 +571,19 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.20" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core", - "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.0", + "hyper-util", "itoa", "matchit", "memchr", @@ -594,28 +595,33 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] name = "axum-core" -version = "0.3.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper 1.0.1", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -2405,7 +2411,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.9", "indexmap 2.0.2", "slab", "tokio", @@ -2474,7 +2480,7 @@ dependencies = [ "base64 0.21.7", "bytes", "headers-core", - "http", + "http 0.2.9", "httpdate", "mime", "sha1", @@ -2486,7 +2492,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" dependencies = [ - "http", + "http 0.2.9", ] [[package]] @@ -2606,6 +2612,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -2613,15 +2630,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.9", "pin-project-lite", ] [[package]] -name = "http-range-header" -version = "0.3.1" +name = "http-body" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "pin-project-lite", +] [[package]] name = "httparse" @@ -2652,8 +2686,8 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -2665,6 +2699,25 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -2672,12 +2725,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.27", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.5.0", + "pin-project-lite", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.58" @@ -2756,8 +2824,8 @@ dependencies = [ "attohttpc", "bytes", "futures", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.27", "log", "rand", "tokio", @@ -3857,7 +3925,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http", + "http 0.2.9", "httparse", "log", "memchr", @@ -4844,7 +4912,7 @@ dependencies = [ "gloo-utils", "hex", "hkdf", - "hyper", + "hyper 0.14.27", "js-sys", "libc", "libp2p-identity", @@ -5763,9 +5831,9 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.27", "hyper-tls", "ipnet", "js-sys", @@ -5779,7 +5847,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration", "tokio", "tokio-native-tls", @@ -6996,6 +7064,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "synstructure" version = "0.12.6" @@ -7293,17 +7367,13 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.4.4" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97" dependencies = [ "bitflags 2.4.1", "bytes", - "futures-core", - "futures-util", - "http", - "http-body", - "http-range-header", + "http 1.1.0", "pin-project-lite", "tower-layer", "tower-service", @@ -7311,9 +7381,9 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" @@ -7515,7 +7585,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http", + "http 0.2.9", "httparse", "log", "rand", @@ -7847,8 +7917,8 @@ dependencies = [ "futures-channel", "futures-util", "headers", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.27", "log", "mime", "mime_guess", diff --git a/node/testing/Cargo.toml b/node/testing/Cargo.toml index c96457c74c..586281d45f 100644 --- a/node/testing/Cargo.toml +++ b/node/testing/Cargo.toml @@ -24,8 +24,8 @@ rand = "0.8" tokio = { version = "1.26.0" } num_cpus = "1.0" rayon = "1.5" -axum = "0.6" -tower-http = { version = "0.4.4", features = ["cors"] } +axum = "0.7" +tower-http = { version = "0.6", features = ["cors"] } strum = "0.26" strum_macros = "0.26" tracing-log = "0.2.0" diff --git a/node/testing/src/main.rs b/node/testing/src/main.rs index bd51d5a7e0..53b0c694ca 100644 --- a/node/testing/src/main.rs +++ b/node/testing/src/main.rs @@ -66,7 +66,7 @@ impl Command { match self { Self::Server(args) => { - server(args.port); + server(rt, args.port); Ok(()) } Self::ScenariosGenerate(cmd) => { diff --git a/node/testing/src/server/mod.rs b/node/testing/src/server/mod.rs index 23f494a73e..dc6028ec24 100644 --- a/node/testing/src/server/mod.rs +++ b/node/testing/src/server/mod.rs @@ -13,10 +13,12 @@ use axum::{ }; use rand::{rngs::StdRng, Rng, SeedableRng}; use serde::{Deserialize, Serialize}; +use tokio::net::TcpListener; +use tokio::runtime::Runtime; use tokio::sync::{oneshot, Mutex, MutexGuard, OwnedMutexGuard}; use tower_http::cors::CorsLayer; -pub fn server(port: u16) { +pub fn server(rt: Runtime, port: u16) { eprintln!("scenarios path: {}", Scenario::PATH); let state = AppState::new(); @@ -56,9 +58,10 @@ pub fn server(port: u16) { .with_state(state) .layer(cors); - tokio::runtime::Handle::current().block_on(async { - axum::Server::bind(&([0, 0, 0, 0], port).into()) - .serve(app.into_make_service()) + rt.block_on(async { + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port)); + let listener = TcpListener::bind(addr).await.unwrap(); + axum::serve(listener, app.into_make_service()) .await .unwrap(); }); From 3f02fc2d08fad560acbccacd9c8d34b71138c89e Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Thu, 7 Nov 2024 02:41:33 +0400 Subject: [PATCH 03/10] feat(testing/server): expose api to create custom cluster/chain, get seeds and genesis_config for that cluster re: #867 --- node/account/src/secret_key.rs | 2 +- node/native/src/node/builder.rs | 2 - .../transition_frontier_genesis_config.rs | 43 ++++++++- node/testing/src/cluster/config.rs | 12 ++- node/testing/src/cluster/mod.rs | 1 - node/testing/src/cluster/runner/mod.rs | 4 +- node/testing/src/cluster/runner/run.rs | 3 +- node/testing/src/node/rust/config.rs | 28 +++--- node/testing/src/scenarios/mod.rs | 2 +- .../basic_connectivity_initial_joining.rs | 3 +- .../basic_connectivity_peer_discovery.rs | 1 - .../scenarios/multi_node/pubsub_advanced.rs | 2 +- .../multi_node/vrf_correct_ledgers.rs | 1 - .../scenarios/multi_node/vrf_correct_slots.rs | 1 - .../vrf_epoch_bounds_correct_ledgers.rs | 1 - .../multi_node/vrf_epoch_bounds_evaluation.rs | 1 - node/testing/src/scenarios/p2p/pubsub.rs | 2 - .../record_replay/block_production.rs | 2 +- .../testing/src/scenarios/simulation/small.rs | 2 +- .../simulation/small_forever_real_time.rs | 2 +- .../basic_connectivity_accept_incoming.rs | 1 - .../basic_connectivity_initial_joining.rs | 1 - .../scenarios/solo_node/sync_to_genesis.rs | 1 - .../solo_node/sync_to_genesis_custom.rs | 1 - node/testing/src/server/mod.rs | 91 +++++++++++++++++++ node/testing/src/server/simulator.rs | 87 ++++++++++++++++++ node/testing/src/simulator/config.rs | 11 ++- node/testing/src/simulator/mod.rs | 23 ++++- node/web/src/node/builder.rs | 1 - p2p/src/p2p_config.rs | 4 - p2p/testing/src/cluster.rs | 1 - 31 files changed, 283 insertions(+), 54 deletions(-) create mode 100644 node/testing/src/server/simulator.rs diff --git a/node/account/src/secret_key.rs b/node/account/src/secret_key.rs index 33f148122b..40912149fd 100644 --- a/node/account/src/secret_key.rs +++ b/node/account/src/secret_key.rs @@ -23,7 +23,7 @@ lazy_static::lazy_static! { // TODO(binier): better way. static ref GENERATED_DETERMINISTIC: Vec = { let mut rng = StdRng::seed_from_u64(0); - (0..1000) + (0..10000) .map(|_| AccountSecretKey::rand_with(&mut rng)) .collect() }; diff --git a/node/native/src/node/builder.rs b/node/native/src/node/builder.rs index dfe21a6406..069021e4d1 100644 --- a/node/native/src/node/builder.rs +++ b/node/native/src/node/builder.rs @@ -4,7 +4,6 @@ use std::{ net::IpAddr, path::Path, sync::Arc, - time::Duration, }; use anyhow::Context; @@ -326,7 +325,6 @@ impl NodeBuilder { identity_pub_key: p2p_sec_key.public_key(), initial_peers, external_addrs, - ask_initial_peers_interval: Duration::from_secs(3600), enabled_channels: ChannelId::iter_all().collect(), peer_discovery: !self.p2p_no_discovery, meshsub: P2pMeshsubConfig { diff --git a/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs b/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs index 4ae9444824..911c138539 100644 --- a/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs +++ b/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs @@ -33,6 +33,7 @@ use crate::{ pub use GenesisConfig as TransitionFrontierGenesisConfig; #[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "kind")] pub enum GenesisConfig { Counts { whales: usize, @@ -78,7 +79,10 @@ pub struct GenesisConfigLoaded { } fn bp_num_delegators(i: usize) -> usize { - (i.saturating_add(1)).checked_mul(2).expect("overflow") + (i.saturating_add(1)) + .checked_mul(2) + .expect("overflow") + .min(32) } #[derive(Debug, thiserror::Error)] @@ -636,6 +640,43 @@ impl PrebuiltGenesisConfig { masks.push(staking_ledger_mask); Ok((masks, load_result)) } + + pub fn from_loaded( + (masks, data): (Vec, GenesisConfigLoaded), + ) -> Result { + let find_mask_by_hash = |hash: &v2::LedgerHash| { + masks + .iter() + .find(|&m| m.clone().merkle_root() == hash.to_field().unwrap()) + .ok_or(()) + }; + let inner_hashes = |mask: &ledger::Mask| { + mask.get_raw_inner_hashes() + .into_iter() + .map(|(idx, hash)| (idx, v2::LedgerHash::from_fp(hash))) + .collect() + }; + let genesis_mask = find_mask_by_hash(&data.genesis_ledger_hash)?; + let epoch_data = |hash: v2::LedgerHash, seed: v2::EpochSeed| { + find_mask_by_hash(&hash).map(|mask| PrebuiltGenesisEpochData { + accounts: mask.to_list().into_iter().map(Into::into).collect(), + ledger_hash: hash, + hashes: inner_hashes(mask), + seed, + }) + }; + Ok(Self { + constants: data.constants, + accounts: genesis_mask.to_list().into_iter().map(Into::into).collect(), + ledger_hash: data.genesis_ledger_hash, + hashes: inner_hashes(genesis_mask), + staking_epoch_data: epoch_data( + data.staking_epoch_ledger_hash, + data.staking_epoch_seed, + )?, + next_epoch_data: epoch_data(data.next_epoch_ledger_hash, data.next_epoch_seed)?, + }) + } } #[derive(Debug, Serialize, Deserialize, BinProtRead, BinProtWrite)] diff --git a/node/testing/src/cluster/config.rs b/node/testing/src/cluster/config.rs index 0a815b006e..30c0abb527 100644 --- a/node/testing/src/cluster/config.rs +++ b/node/testing/src/cluster/config.rs @@ -4,11 +4,15 @@ use crate::node::OcamlNodeExecutable; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ClusterConfig { + #[serde(default)] port_range: Option<(u16, u16)>, all_rust_to_rust_use_webrtc: bool, proof_kind: ProofKind, + #[serde(default)] is_replay: bool, + #[serde(default)] use_debugger: bool, + #[serde(default)] ocaml_node_executable: Option, } @@ -21,8 +25,7 @@ pub enum ProofKind { impl Default for ProofKind { fn default() -> Self { - // TODO(binier): change default to `ConstraintsChecked` once - // https://github.com/openmina/openmina/issues/260 is closed + // once it's working, change to Self::ConstraintsChecked Self::Dummy } } @@ -72,6 +75,11 @@ impl ClusterConfig { self.all_rust_to_rust_use_webrtc } + pub fn set_proof_kind(&mut self, kind: ProofKind) -> &mut Self { + self.proof_kind = kind; + self + } + pub fn proof_kind(&self) -> ProofKind { self.proof_kind } diff --git a/node/testing/src/cluster/mod.rs b/node/testing/src/cluster/mod.rs index 69f976b924..0163b855c1 100644 --- a/node/testing/src/cluster/mod.rs +++ b/node/testing/src/cluster/mod.rs @@ -282,7 +282,6 @@ impl Cluster { identity_pub_key: p2p_sec_key.public_key(), initial_peers, external_addrs: vec![], - ask_initial_peers_interval: testing_config.ask_initial_peers_interval, enabled_channels: ChannelId::iter_all().collect(), peer_discovery: true, timeouts: testing_config.timeouts, diff --git a/node/testing/src/cluster/runner/mod.rs b/node/testing/src/cluster/runner/mod.rs index 163fe717e1..c21d37210b 100644 --- a/node/testing/src/cluster/runner/mod.rs +++ b/node/testing/src/cluster/runner/mod.rs @@ -23,7 +23,7 @@ use crate::{ pub struct ClusterRunner<'a> { cluster: &'a mut Cluster, - add_step: Box, + add_step: Box, rng: StdRng, latest_advance_time: Option, } @@ -31,7 +31,7 @@ pub struct ClusterRunner<'a> { impl<'a> ClusterRunner<'a> { pub fn new(cluster: &'a mut Cluster, add_step: F) -> Self where - F: 'a + FnMut(&ScenarioStep), + F: 'a + Send + FnMut(&ScenarioStep), { Self { cluster, diff --git a/node/testing/src/cluster/runner/run.rs b/node/testing/src/cluster/runner/run.rs index ca5aa9c609..057497d8df 100644 --- a/node/testing/src/cluster/runner/run.rs +++ b/node/testing/src/cluster/runner/run.rs @@ -23,12 +23,13 @@ pub struct RunCfg< advance_time: Option, } -#[derive(derive_more::From, Serialize, Deserialize, Debug, Clone)] +#[derive(derive_more::From, Serialize, Deserialize, Debug, Default, Clone)] pub enum RunCfgAdvanceTime { /// Set the range of time in milliseconds, with which time will be /// advanced during `run` function execution. Rand(std::ops::RangeInclusive), /// Advance time so that node's time matches the real time. + #[default] Real, } diff --git a/node/testing/src/node/rust/config.rs b/node/testing/src/node/rust/config.rs index 1022004f7e..4754ebad74 100644 --- a/node/testing/src/node/rust/config.rs +++ b/node/testing/src/node/rust/config.rs @@ -1,6 +1,6 @@ use std::fs::File; use std::path::Path; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use node::account::AccountSecretKey; use node::config::DEVNET_CONFIG; @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use crate::scenario::ListenerNode; -#[derive(Serialize, Deserialize, Debug, Clone, Default)] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] pub enum TestPeerId { /// NOTE This option results a deterministic private key derived from the /// node index in the cluster. Be aware that when interacting with OCaml @@ -26,18 +26,25 @@ pub struct RustNodeTestingConfig { pub initial_time: redux::Timestamp, pub genesis: Arc, pub max_peers: usize, - pub ask_initial_peers_interval: Duration, + #[serde(default)] pub initial_peers: Vec, + #[serde(default)] pub peer_id: TestPeerId, + #[serde(default)] pub snark_worker: Option, + #[serde(default)] pub block_producer: Option, + #[serde(default)] pub timeouts: P2pTimeouts, + #[serde(default)] pub libp2p_port: Option, + #[serde(default)] pub recorder: Recorder, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] pub enum Recorder { + #[default] None, StateWithInputActions, } @@ -54,7 +61,6 @@ impl RustNodeTestingConfig { initial_time: redux::Timestamp::ZERO, genesis: DEVNET_CONFIG.clone(), max_peers: 100, - ask_initial_peers_interval: Duration::from_secs(10), initial_peers: Vec::new(), peer_id: TestPeerId::default(), block_producer: None, @@ -70,7 +76,6 @@ impl RustNodeTestingConfig { initial_time: redux::Timestamp::ZERO, genesis: DEVNET_CONFIG.clone(), max_peers: 100, - ask_initial_peers_interval: Duration::from_secs(10), initial_peers: Vec::new(), peer_id: TestPeerId::default(), block_producer: None, @@ -86,11 +91,6 @@ impl RustNodeTestingConfig { self } - pub fn ask_initial_peers_interval(mut self, d: Duration) -> Self { - self.ask_initial_peers_interval = d; - self - } - pub fn initial_peers(mut self, v: Vec) -> Self { self.initial_peers = v; self @@ -119,9 +119,3 @@ impl RustNodeTestingConfig { self } } - -impl Default for Recorder { - fn default() -> Self { - Self::None - } -} diff --git a/node/testing/src/scenarios/mod.rs b/node/testing/src/scenarios/mod.rs index 867a1f7464..a959c5cfda 100644 --- a/node/testing/src/scenarios/mod.rs +++ b/node/testing/src/scenarios/mod.rs @@ -168,7 +168,7 @@ impl Scenarios { async fn run(self, cluster: &mut Cluster, add_step: F) where - F: FnMut(&ScenarioStep), + F: Send + FnMut(&ScenarioStep), { let runner = ClusterRunner::new(cluster, add_step); match self { diff --git a/node/testing/src/scenarios/multi_node/basic_connectivity_initial_joining.rs b/node/testing/src/scenarios/multi_node/basic_connectivity_initial_joining.rs index 73eb32db79..9ae698eec0 100644 --- a/node/testing/src/scenarios/multi_node/basic_connectivity_initial_joining.rs +++ b/node/testing/src/scenarios/multi_node/basic_connectivity_initial_joining.rs @@ -44,8 +44,7 @@ impl MultiNodeBasicConnectivityInitialJoining { let node = runner.add_rust_node( RustNodeTestingConfig::devnet_default() .max_peers(MAX_PEERS_PER_NODE) - .initial_peers(vec![seed_node.into()]) - .ask_initial_peers_interval(Duration::from_secs(10)), + .initial_peers(vec![seed_node.into()]), ); eprintln!("launch Openmina node, id: {node}, connects to {seed_node}"); diff --git a/node/testing/src/scenarios/multi_node/basic_connectivity_peer_discovery.rs b/node/testing/src/scenarios/multi_node/basic_connectivity_peer_discovery.rs index 771b74a018..fb7936288b 100644 --- a/node/testing/src/scenarios/multi_node/basic_connectivity_peer_discovery.rs +++ b/node/testing/src/scenarios/multi_node/basic_connectivity_peer_discovery.rs @@ -67,7 +67,6 @@ impl MultiNodeBasicConnectivityPeerDiscovery { eprintln!("OCaml nodes should be ready now"); let config = RustNodeTestingConfig::devnet_default() - .ask_initial_peers_interval(Duration::from_secs(3600)) //.with_daemon_json("genesis_ledgers/devnet-full.json") .max_peers(100) .initial_peers( diff --git a/node/testing/src/scenarios/multi_node/pubsub_advanced.rs b/node/testing/src/scenarios/multi_node/pubsub_advanced.rs index 743343873e..d946326885 100644 --- a/node/testing/src/scenarios/multi_node/pubsub_advanced.rs +++ b/node/testing/src/scenarios/multi_node/pubsub_advanced.rs @@ -45,6 +45,6 @@ impl MultiNodePubsubPropagateBlock { recorder: Recorder::StateWithInputActions, }; let mut simulator = Simulator::new(initial_time, config); - simulator.run(&mut runner).await; + simulator.setup_and_run(&mut runner).await; } } diff --git a/node/testing/src/scenarios/multi_node/vrf_correct_ledgers.rs b/node/testing/src/scenarios/multi_node/vrf_correct_ledgers.rs index a9576a7ec4..07fd97d8ff 100644 --- a/node/testing/src/scenarios/multi_node/vrf_correct_ledgers.rs +++ b/node/testing/src/scenarios/multi_node/vrf_correct_ledgers.rs @@ -37,7 +37,6 @@ impl MultiNodeVrfGetCorrectLedgers { initial_time, genesis: node::config::DEVNET_CONFIG.clone(), max_peers: 100, - ask_initial_peers_interval: Duration::from_secs(60 * 60), initial_peers: Vec::new(), peer_id: Default::default(), block_producer: Some(RustNodeBlockProducerTestingConfig { diff --git a/node/testing/src/scenarios/multi_node/vrf_correct_slots.rs b/node/testing/src/scenarios/multi_node/vrf_correct_slots.rs index d0f5b88864..97c0e0a8c7 100644 --- a/node/testing/src/scenarios/multi_node/vrf_correct_slots.rs +++ b/node/testing/src/scenarios/multi_node/vrf_correct_slots.rs @@ -43,7 +43,6 @@ impl MultiNodeVrfGetCorrectSlots { initial_time, genesis: node::config::DEVNET_CONFIG.clone(), max_peers: 100, - ask_initial_peers_interval: Duration::from_secs(60 * 60), initial_peers: Vec::new(), peer_id: Default::default(), block_producer: Some(RustNodeBlockProducerTestingConfig { diff --git a/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_correct_ledgers.rs b/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_correct_ledgers.rs index d3d3a10be3..886ae23b27 100644 --- a/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_correct_ledgers.rs +++ b/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_correct_ledgers.rs @@ -51,7 +51,6 @@ impl MultiNodeVrfEpochBoundsCorrectLedger { initial_time, genesis: node::config::DEVNET_CONFIG.clone(), max_peers: 100, - ask_initial_peers_interval: Duration::from_secs(60 * 60), initial_peers: Vec::new(), peer_id: Default::default(), block_producer: None, diff --git a/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_evaluation.rs b/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_evaluation.rs index 673d04496c..77eb09bc8f 100644 --- a/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_evaluation.rs +++ b/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_evaluation.rs @@ -37,7 +37,6 @@ impl MultiNodeVrfEpochBoundsEvaluation { initial_time, genesis: node::config::DEVNET_CONFIG.clone(), max_peers: 100, - ask_initial_peers_interval: Duration::from_secs(60 * 60), initial_peers: Vec::new(), peer_id: Default::default(), block_producer: None, diff --git a/node/testing/src/scenarios/p2p/pubsub.rs b/node/testing/src/scenarios/p2p/pubsub.rs index 1e2d1604c3..225e1d5eb5 100644 --- a/node/testing/src/scenarios/p2p/pubsub.rs +++ b/node/testing/src/scenarios/p2p/pubsub.rs @@ -14,7 +14,6 @@ impl P2pReceiveBlock { pub async fn run(self, mut runner: ClusterRunner<'_>) { let config = RustNodeTestingConfig::devnet_default() // make sure it will not ask initial peers - .ask_initial_peers_interval(Duration::from_secs(3600)) .max_peers(1) .initial_peers(vec![hosts::devnet()[0].clone()]); let retransmitter_openmina_node = runner.add_rust_node(config); @@ -27,7 +26,6 @@ impl P2pReceiveBlock { let config = RustNodeTestingConfig::devnet_default() // make sure it will not ask initial peers - .ask_initial_peers_interval(Duration::from_secs(3600)) .max_peers(1) .initial_peers(vec![retransmitter_openmina_node.into()]); let receiver_openmina_node = runner.add_rust_node(config); diff --git a/node/testing/src/scenarios/record_replay/block_production.rs b/node/testing/src/scenarios/record_replay/block_production.rs index d6f5025cfb..0b8e82f098 100644 --- a/node/testing/src/scenarios/record_replay/block_production.rs +++ b/node/testing/src/scenarios/record_replay/block_production.rs @@ -39,7 +39,7 @@ impl RecordReplayBlockProduction { recorder: Recorder::StateWithInputActions, }; let mut simulator = Simulator::new(initial_time, cfg); - simulator.run(&mut runner).await; + simulator.setup_and_run(&mut runner).await; // flush the recorded data. node::recorder::Recorder::graceful_shutdown(); diff --git a/node/testing/src/scenarios/simulation/small.rs b/node/testing/src/scenarios/simulation/small.rs index d7bbc879cd..8d77b8b655 100644 --- a/node/testing/src/scenarios/simulation/small.rs +++ b/node/testing/src/scenarios/simulation/small.rs @@ -43,6 +43,6 @@ impl SimulationSmall { recorder: Default::default(), }; let mut simulator = Simulator::new(initial_time, cfg); - simulator.run(&mut runner).await; + simulator.setup_and_run(&mut runner).await; } } diff --git a/node/testing/src/scenarios/simulation/small_forever_real_time.rs b/node/testing/src/scenarios/simulation/small_forever_real_time.rs index 8d77992a2f..1b013a5f49 100644 --- a/node/testing/src/scenarios/simulation/small_forever_real_time.rs +++ b/node/testing/src/scenarios/simulation/small_forever_real_time.rs @@ -43,6 +43,6 @@ impl SimulationSmallForeverRealTime { recorder: Default::default(), }; let mut simulator = Simulator::new(initial_time, cfg); - simulator.run(&mut runner).await; + simulator.setup_and_run(&mut runner).await; } } diff --git a/node/testing/src/scenarios/solo_node/basic_connectivity_accept_incoming.rs b/node/testing/src/scenarios/solo_node/basic_connectivity_accept_incoming.rs index 55bc34059f..626d8b4ad4 100644 --- a/node/testing/src/scenarios/solo_node/basic_connectivity_accept_incoming.rs +++ b/node/testing/src/scenarios/solo_node/basic_connectivity_accept_incoming.rs @@ -35,7 +35,6 @@ impl SoloNodeBasicConnectivityAcceptIncoming { eprintln!("add initial peer: {seed:?}"); } let config = RustNodeTestingConfig::devnet_default() - .ask_initial_peers_interval(Duration::from_secs(3600)) .max_peers(MAX_PEERS_PER_NODE) .initial_peers(initial_peers) .with_peer_id(rand::thread_rng().gen()); diff --git a/node/testing/src/scenarios/solo_node/basic_connectivity_initial_joining.rs b/node/testing/src/scenarios/solo_node/basic_connectivity_initial_joining.rs index b04cd24ac4..cac135ee3c 100644 --- a/node/testing/src/scenarios/solo_node/basic_connectivity_initial_joining.rs +++ b/node/testing/src/scenarios/solo_node/basic_connectivity_initial_joining.rs @@ -35,7 +35,6 @@ impl SoloNodeBasicConnectivityInitialJoining { eprintln!("add initial peer: {seed:?}"); } let config = RustNodeTestingConfig::devnet_default() - .ask_initial_peers_interval(Duration::from_secs(3600)) .max_peers(MAX_PEERS_PER_NODE) .initial_peers(initial_peers); diff --git a/node/testing/src/scenarios/solo_node/sync_to_genesis.rs b/node/testing/src/scenarios/solo_node/sync_to_genesis.rs index 3d6a127fa2..f5ffc5eb6a 100644 --- a/node/testing/src/scenarios/solo_node/sync_to_genesis.rs +++ b/node/testing/src/scenarios/solo_node/sync_to_genesis.rs @@ -47,7 +47,6 @@ impl SoloNodeSyncToGenesis { initial_time, genesis: node::config::DEVNET_CONFIG.clone(), max_peers: 100, - ask_initial_peers_interval: Duration::from_secs(60 * 60), initial_peers: Vec::new(), peer_id: Default::default(), snark_worker: None, diff --git a/node/testing/src/scenarios/solo_node/sync_to_genesis_custom.rs b/node/testing/src/scenarios/solo_node/sync_to_genesis_custom.rs index 9fb673774f..6e795b221f 100644 --- a/node/testing/src/scenarios/solo_node/sync_to_genesis_custom.rs +++ b/node/testing/src/scenarios/solo_node/sync_to_genesis_custom.rs @@ -63,7 +63,6 @@ impl SoloNodeSyncToGenesisCustom { initial_time, genesis: node::config::DEVNET_CONFIG.clone(), max_peers: 100, - ask_initial_peers_interval: Duration::from_secs(60 * 60), initial_peers: Vec::new(), peer_id: Default::default(), block_producer: None, diff --git a/node/testing/src/server/mod.rs b/node/testing/src/server/mod.rs index dc6028ec24..e080c5b881 100644 --- a/node/testing/src/server/mod.rs +++ b/node/testing/src/server/mod.rs @@ -1,3 +1,5 @@ +pub mod simulator; + use crate::cluster::{Cluster, ClusterConfig, ClusterNodeId}; use crate::node::NodeTestingConfig; use crate::scenario::{event_details, Scenario, ScenarioId, ScenarioInfo, ScenarioStep}; @@ -11,6 +13,8 @@ use axum::{ routing::{get, post, put}, Json, Router, }; +use node::p2p::connection::outgoing::P2pConnectionOutgoingInitOpts; +use node::transition_frontier::genesis::{GenesisConfig, PrebuiltGenesisConfig}; use rand::{rngs::StdRng, Rng, SeedableRng}; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; @@ -40,6 +44,8 @@ pub fn server(rt: Runtime, port: u16) { "/:cluster_id/scenarios/reload", post(cluster_scenarios_reload), ) + .route("/:cluster_id/seeds", get(cluster_seeds)) + .route("/:cluster_id/genesis/config", get(cluster_genesis_config)) .route( "/:cluster_id/nodes/events/pending", get(cluster_events_pending), @@ -55,6 +61,7 @@ pub fn server(rt: Runtime, port: u16) { let app = Router::new() .nest("/scenarios", scenarios_router) .nest("/clusters", clusters_router) + .nest("/simulations", simulator::simulations_router()) .with_state(state) .layer(cors); @@ -143,6 +150,27 @@ impl AppState { Ok((id, cluster_guard)) } + pub async fn cluster_create_empty( + &self, + config: ClusterConfig, + ) -> Result<(u16, OwnedMutexGuard), (StatusCode, String)> { + let cluster = Cluster::new(config); + + let mut state = self.lock().await; + let id = loop { + let id = state.rng.gen(); + if !state.clusters.contains_key(&id) { + break id; + } + }; + + let cluster = Arc::new(Mutex::new(cluster)); + let cluster_guard = cluster.clone().try_lock_owned().unwrap(); + state.clusters.insert(id, cluster); + + Ok((id, cluster_guard)) + } + pub async fn cluster_destroy(&self, cluster_id: u16) -> bool { self.lock().await.clusters.remove(&cluster_id).is_some() } @@ -380,6 +408,69 @@ async fn cluster_scenarios_reload( .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string())) } +async fn cluster_seeds( + State(state): State, + Path(cluster_id): Path, +) -> Result>, (StatusCode, String)> { + state + .cluster(cluster_id) + .await + .map(|cluster| { + cluster + .nodes_iter() + .filter(|(_, node)| node.config().initial_peers.is_empty()) + .map(|(_, node)| node.dial_addr()) + .collect() + }) + .map(Json) +} + +async fn cluster_genesis_config( + State(state): State, + Path(cluster_id): Path, +) -> Result, (StatusCode, String)> { + let cluster = state.cluster(cluster_id).await?; + let genesis = cluster + .nodes_iter() + .next() + .map(|(_, node)| node.config().genesis.clone()); + let genesis = genesis.ok_or_else(|| { + ( + StatusCode::BAD_REQUEST, + "no nodes in the cluster".to_owned(), + ) + })?; + if let GenesisConfig::Prebuilt(encoded) = &*genesis { + return Ok(encoded.clone().into_owned()); + } + tokio::task::spawn_blocking(move || { + let res = genesis.load().map_err(|err| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to load genesis config. err: {err}"), + ) + })?; + let mut encoded = Vec::new(); + PrebuiltGenesisConfig::from_loaded(res) + .map_err(|_| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + "failed to build `PrebuiltGenesisConfig` from loaded data".to_owned(), + ) + })? + .store(&mut encoded) + .map_err(|_| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + "failed to encode `PrebuiltGenesisConfig`".to_owned(), + ) + })?; + Ok(encoded) + }) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "join error".to_owned()))? +} + #[derive(Serialize)] struct ClusterNodePendingEvents { node_id: ClusterNodeId, diff --git a/node/testing/src/server/simulator.rs b/node/testing/src/server/simulator.rs new file mode 100644 index 0000000000..a4669979f0 --- /dev/null +++ b/node/testing/src/server/simulator.rs @@ -0,0 +1,87 @@ +use std::{sync::Arc, time::Duration}; + +use axum::{ + extract::{Json, State}, + http::StatusCode, + routing::put, +}; +use openmina_core::channels::oneshot; +use serde::{Deserialize, Serialize}; + +use crate::{ + cluster::ClusterConfig, + scenarios::ClusterRunner, + simulator::{Simulator, SimulatorConfig}, +}; + +use super::AppState; + +pub fn simulations_router() -> axum::Router { + axum::Router::new().route("/", put(simulation_create)) +} + +#[derive(Deserialize)] +struct SimulationCreateArgs { + cluster: ClusterConfig, + simulator: SimulatorConfig, +} + +#[derive(Serialize)] +struct SimulationCreateResponse { + cluster_id: u16, +} + +async fn simulation_create( + State(state): State, + Json(args): Json, +) -> Result, (StatusCode, String)> { + async fn setup( + state: AppState, + args: SimulationCreateArgs, + ) -> Result<(u16, Simulator), (StatusCode, String)> { + let (cluster_id, mut cluster) = state.cluster_create_empty(args.cluster).await?; + + let initial_time = redux::Timestamp::global_now(); + let mut simulator = Simulator::new(initial_time, args.simulator); + simulator + .setup(&mut ClusterRunner::new(&mut cluster, |_| {})) + .await; + Ok((cluster_id, simulator)) + } + let (setup_tx, setup_rx) = oneshot::channel(); + let state_clone = state.clone(); + + std::thread::spawn(move || { + let state = state_clone; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + let (cluster_id, mut simulator) = match setup(state.clone(), args).await { + Err(err) => { + let _ = setup_tx.send(Err(err)); + return; + } + Ok((cluster_id, simulator)) => { + let _ = setup_tx.send(Ok(cluster_id)); + (cluster_id, simulator) + } + }; + let cluster_mutex = match state.cluster_mutex(cluster_id).await { + Err(_) => return, + Ok(cluster_mutex) => Arc::downgrade(&cluster_mutex), + }; + while let Some(cluster_mutex) = cluster_mutex.upgrade() { + let mut cluster = cluster_mutex.lock().await; + let mut runner = ClusterRunner::new(&mut *cluster, |_| {}); + let _ = + tokio::time::timeout(Duration::from_millis(500), simulator.run(&mut runner)) + .await; + } + }); + }); + let cluster_id = setup_rx.await.unwrap()?; + + Ok(SimulationCreateResponse { cluster_id }).map(Json) +} diff --git a/node/testing/src/simulator/config.rs b/node/testing/src/simulator/config.rs index a8b5c2d730..11913b1c90 100644 --- a/node/testing/src/simulator/config.rs +++ b/node/testing/src/simulator/config.rs @@ -12,15 +12,24 @@ pub struct SimulatorConfig { pub normal_nodes: usize, pub snark_workers: usize, pub block_producers: usize, + #[serde(default)] pub advance_time: RunCfgAdvanceTime, + #[serde(default)] pub run_until: SimulatorRunUntil, + #[serde(default = "duration_max")] pub run_until_timeout: Duration, + #[serde(default)] pub recorder: Recorder, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] pub enum SimulatorRunUntil { + #[default] Forever, Epoch(u32), BlockchainLength(u32), } + +fn duration_max() -> Duration { + Duration::MAX +} diff --git a/node/testing/src/simulator/mod.rs b/node/testing/src/simulator/mod.rs index a5e4a2b85e..89a9e29dc4 100644 --- a/node/testing/src/simulator/mod.rs +++ b/node/testing/src/simulator/mod.rs @@ -1,5 +1,6 @@ mod config; pub use config::*; + use mina_p2p_messages::v2::{ CurrencyFeeStableV1, UnsignedExtendedUInt64Int64ForVersionTagsStableV1, }; @@ -37,7 +38,6 @@ impl Simulator { initial_time: self.initial_time(), genesis: self.config.genesis.clone(), max_peers: 1000, - ask_initial_peers_interval: Duration::from_secs(60), initial_peers: Vec::new(), peer_id: Default::default(), block_producer: None, @@ -98,6 +98,10 @@ impl Simulator { } async fn set_up_normal_nodes(&mut self, runner: &mut ClusterRunner<'_>) { + if self.config.normal_nodes == 0 { + return; + } + eprintln!("setting up normal nodes: {}", self.config.normal_nodes); let node_config = RustNodeTestingConfig { @@ -114,6 +118,10 @@ impl Simulator { } async fn set_up_snark_worker_nodes(&mut self, runner: &mut ClusterRunner<'_>) { + if self.config.snark_workers == 0 { + return; + } + eprintln!( "setting up rust snark worker nodes: {}", self.config.snark_workers @@ -163,6 +171,10 @@ impl Simulator { } async fn set_up_block_producer_nodes(&mut self, runner: &mut ClusterRunner<'_>) { + if self.config.block_producers == 0 { + return; + } + let block_producers = runner.block_producer_sec_keys(ClusterNodeId::new_unchecked(0)); assert!(self.config.block_producers <= block_producers.len()); @@ -203,12 +215,19 @@ impl Simulator { self.wait_for_all_nodes_synced(runner).await; } - pub async fn run<'a>(&mut self, runner: &mut ClusterRunner<'a>) { + pub async fn setup<'a>(&mut self, runner: &mut ClusterRunner<'a>) { self.set_up_seed_nodes(runner).await; self.set_up_normal_nodes(runner).await; self.set_up_snark_worker_nodes(runner).await; self.set_up_block_producer_nodes(runner).await; + } + pub async fn setup_and_run<'a>(&mut self, runner: &mut ClusterRunner<'a>) { + self.setup(runner).await; + self.run(runner).await; + } + + pub async fn run<'a>(&mut self, runner: &mut ClusterRunner<'a>) { let run_until = self.config.run_until.clone(); let advance_time = self.config.advance_time.clone(); let start_t = redux::Instant::now(); diff --git a/node/web/src/node/builder.rs b/node/web/src/node/builder.rs index c9270a7a45..5942877f60 100644 --- a/node/web/src/node/builder.rs +++ b/node/web/src/node/builder.rs @@ -231,7 +231,6 @@ impl NodeBuilder { identity_pub_key: p2p_sec_key.public_key(), initial_peers, external_addrs: vec![], - ask_initial_peers_interval: Duration::from_secs(3600), enabled_channels: ChannelId::iter_all().collect(), peer_discovery: !self.p2p_no_discovery, meshsub: P2pMeshsubConfig { diff --git a/p2p/src/p2p_config.rs b/p2p/src/p2p_config.rs index 9a425a8b7e..dc0be207f8 100644 --- a/p2p/src/p2p_config.rs +++ b/p2p/src/p2p_config.rs @@ -25,10 +25,6 @@ pub struct P2pConfig { /// External addresses pub external_addrs: Vec, - /// The time interval that must elapse before the next peer discovery request. - /// The node periodically polls peers for their connections to keep our list up to date. - pub ask_initial_peers_interval: Duration, - pub enabled_channels: BTreeSet, pub timeouts: P2pTimeouts, diff --git a/p2p/testing/src/cluster.rs b/p2p/testing/src/cluster.rs index 671f55887e..454e95f318 100644 --- a/p2p/testing/src/cluster.rs +++ b/p2p/testing/src/cluster.rs @@ -355,7 +355,6 @@ impl Cluster { identity_pub_key: secret_key.public_key(), initial_peers, external_addrs: vec![], - ask_initial_peers_interval: Duration::from_secs(5), enabled_channels: p2p::channels::ChannelId::for_libp2p().collect(), peer_discovery: config.discovery, timeouts: config.timeouts, From 67ba3d483a89eed4220a1ba38faa8d29c2ceff1b Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Fri, 8 Nov 2024 01:29:00 +0400 Subject: [PATCH 04/10] feat(testing/server): expose webnode re: #867 --- Cargo.lock | 17 +++++++++++++++++ node/testing/Cargo.toml | 2 +- node/testing/src/server/mod.rs | 25 +++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 7140b81fa6..7d111810ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2657,6 +2657,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08a397c49fec283e3d6211adbe480be95aae5f304cfb923e9970e08956d5168a" + [[package]] name = "httparse" version = "1.8.0" @@ -7373,10 +7379,21 @@ checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97" dependencies = [ "bitflags 2.4.1", "bytes", + "futures-util", "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "http-range-header", + "httpdate", + "mime", + "mime_guess", + "percent-encoding", "pin-project-lite", + "tokio", + "tokio-util", "tower-layer", "tower-service", + "tracing", ] [[package]] diff --git a/node/testing/Cargo.toml b/node/testing/Cargo.toml index 586281d45f..ee0f78b31f 100644 --- a/node/testing/Cargo.toml +++ b/node/testing/Cargo.toml @@ -25,7 +25,7 @@ tokio = { version = "1.26.0" } num_cpus = "1.0" rayon = "1.5" axum = "0.7" -tower-http = { version = "0.6", features = ["cors"] } +tower-http = { version = "0.6", features = ["cors", "fs"] } strum = "0.26" strum_macros = "0.26" tracing-log = "0.2.0" diff --git a/node/testing/src/server/mod.rs b/node/testing/src/server/mod.rs index e080c5b881..66dd715e7d 100644 --- a/node/testing/src/server/mod.rs +++ b/node/testing/src/server/mod.rs @@ -5,8 +5,12 @@ use crate::node::NodeTestingConfig; use crate::scenario::{event_details, Scenario, ScenarioId, ScenarioInfo, ScenarioStep}; use crate::service::PendingEventId; +use std::path::PathBuf; use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use axum::http::header; +use axum::middleware; +use axum::routing::get_service; use axum::{ extract::{Path, State}, http::StatusCode, @@ -21,9 +25,14 @@ use tokio::net::TcpListener; use tokio::runtime::Runtime; use tokio::sync::{oneshot, Mutex, MutexGuard, OwnedMutexGuard}; use tower_http::cors::CorsLayer; +use tower_http::services::ServeDir; pub fn server(rt: Runtime, port: u16) { + let fe_dist_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("../../") + .join("frontend/dist/frontend/"); eprintln!("scenarios path: {}", Scenario::PATH); + eprintln!("FrontEnd dist path: {fe_dist_dir:?}"); let state = AppState::new(); @@ -62,6 +71,22 @@ pub fn server(rt: Runtime, port: u16) { .nest("/scenarios", scenarios_router) .nest("/clusters", clusters_router) .nest("/simulations", simulator::simulations_router()) + .fallback( + get_service(ServeDir::new(fe_dist_dir)).layer(middleware::from_fn( + |req, next: middleware::Next| async { + let mut resp = next.run(req).await; + resp.headers_mut().insert( + header::HeaderName::from_static("cross-origin-embedder-policy"), + header::HeaderValue::from_static("require-corp"), + ); + resp.headers_mut().insert( + header::HeaderName::from_static("cross-origin-opener-policy"), + header::HeaderValue::from_static("same-origin"), + ); + resp + }, + )), + ) .with_state(state) .layer(cors); From dd746269ece80cd55a3d0e6b65bc2da7f9951307 Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Mon, 11 Nov 2024 08:44:45 +0400 Subject: [PATCH 05/10] fix(p2p/webrtc): connection closed by peer, not pruned --- node/src/action_kind.rs | 4 +- node/src/event_source/event_source_effects.rs | 2 +- .../p2p_disconnection_actions.rs | 4 ++ .../p2p_disconnection_reducer.rs | 41 +++++++------------ 4 files changed, 22 insertions(+), 29 deletions(-) diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index ca8823d8c8..d5dc25d10d 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -358,6 +358,7 @@ pub enum ActionKind { P2pConnectionOutgoingEffectfulRandomInit, P2pDisconnectionFinish, P2pDisconnectionInit, + P2pDisconnectionPeerClosed, P2pDisconnectionEffectfulInit, P2pEffectfulInitialize, P2pIdentifyNewRequest, @@ -704,7 +705,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 589; + pub const COUNT: u16 = 590; } impl std::fmt::Display for ActionKind { @@ -1184,6 +1185,7 @@ impl ActionKindGet for P2pDisconnectionAction { fn kind(&self) -> ActionKind { match self { Self::Init { .. } => ActionKind::P2pDisconnectionInit, + Self::PeerClosed { .. } => ActionKind::P2pDisconnectionPeerClosed, Self::Finish { .. } => ActionKind::P2pDisconnectionFinish, } } diff --git a/node/src/event_source/event_source_effects.rs b/node/src/event_source/event_source_effects.rs index de299f9c0e..339ba4e81e 100644 --- a/node/src/event_source/event_source_effects.rs +++ b/node/src/event_source/event_source_effects.rs @@ -197,7 +197,7 @@ pub fn event_source_effects(store: &mut Store, action: EventSourc } }, P2pConnectionEvent::Closed(peer_id) => { - store.dispatch(P2pDisconnectionAction::Finish { peer_id }); + store.dispatch(P2pDisconnectionAction::PeerClosed { peer_id }); } }, P2pEvent::Channel(e) => match e { diff --git a/p2p/src/disconnection/p2p_disconnection_actions.rs b/p2p/src/disconnection/p2p_disconnection_actions.rs index 6a9b011e8d..19874939e3 100644 --- a/p2p/src/disconnection/p2p_disconnection_actions.rs +++ b/p2p/src/disconnection/p2p_disconnection_actions.rs @@ -15,6 +15,9 @@ pub enum P2pDisconnectionAction { peer_id: PeerId, reason: P2pDisconnectionReason, }, + /// Peer disconnection. + #[action_event(fields(display(peer_id)), level = info)] + PeerClosed { peer_id: PeerId }, /// Finish disconnecting from a peer. #[action_event(fields(display(peer_id)), level = debug)] Finish { peer_id: PeerId }, @@ -24,6 +27,7 @@ impl redux::EnablingCondition for P2pDisconnectionAction { fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool { match self { P2pDisconnectionAction::Init { peer_id, .. } + | P2pDisconnectionAction::PeerClosed { peer_id, .. } | P2pDisconnectionAction::Finish { peer_id } => { state.peers.get(peer_id).map_or(false, |peer| { !matches!(peer.status, P2pPeerStatus::Disconnected { .. }) diff --git a/p2p/src/disconnection/p2p_disconnection_reducer.rs b/p2p/src/disconnection/p2p_disconnection_reducer.rs index 761205ddbc..aa43664c27 100644 --- a/p2p/src/disconnection/p2p_disconnection_reducer.rs +++ b/p2p/src/disconnection/p2p_disconnection_reducer.rs @@ -55,41 +55,29 @@ impl P2pDisconnectedState { dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id }); Ok(()) } - #[cfg(not(feature = "p2p-libp2p"))] + P2pDisconnectionAction::PeerClosed { peer_id } => { + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id }); + Ok(()) + } P2pDisconnectionAction::Finish { peer_id } => { let Some(peer) = p2p_state.peers.get_mut(&peer_id) else { bug_condition!("Invalid state for: `P2pDisconnectionAction::Finish`"); return Ok(()); }; - peer.status = P2pPeerStatus::Disconnected { time: meta.time() }; - - let (dispatcher, state) = state_context.into_dispatcher_and_state(); - let p2p_state: &P2pState = state.substate()?; - dispatcher.push(P2pPeerAction::Remove { peer_id }); - - if let Some(callback) = &p2p_state.callbacks.on_p2p_disconnection_finish { - dispatcher.push_callback(callback.clone(), peer_id); - } - Ok(()) - } - #[cfg(feature = "p2p-libp2p")] - P2pDisconnectionAction::Finish { peer_id } => { - if p2p_state - .network - .scheduler - .connections - .iter() - .any(|(_addr, conn_state)| { - conn_state.peer_id() == Some(&peer_id) && conn_state.closed.is_none() - }) + if peer.is_libp2p() + && p2p_state + .network + .scheduler + .connections + .iter() + .any(|(_addr, conn_state)| { + conn_state.peer_id() == Some(&peer_id) && conn_state.closed.is_none() + }) { return Ok(()); } - let Some(peer) = p2p_state.peers.get_mut(&peer_id) else { - bug_condition!("Invalid state for: `P2pDisconnectionAction::Finish`"); - return Ok(()); - }; peer.status = P2pPeerStatus::Disconnected { time: meta.time() }; let (dispatcher, state) = state_context.into_dispatcher_and_state(); @@ -99,7 +87,6 @@ impl P2pDisconnectedState { if let Some(callback) = &p2p_state.callbacks.on_p2p_disconnection_finish { dispatcher.push_callback(callback.clone(), peer_id); } - Ok(()) } } From 6178668027c0a920a286ef51bb59b04152f8504a Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Tue, 12 Nov 2024 07:50:07 +0400 Subject: [PATCH 06/10] feat(testing/server): custom cluster webnode full integration re: #867 --- Cargo.lock | 5 +- core/Cargo.toml | 2 + core/src/http.rs | 49 +++++ core/src/lib.rs | 4 + frontend/src/app/app.component.ts | 5 + .../src/app/core/services/web-node.service.ts | 41 ++++- ledger/Cargo.toml | 2 - ledger/src/proofs/circuit_blobs.rs | 51 +----- node/account/src/secret_key.rs | 4 + .../transition_frontier_genesis_config.rs | 11 ++ node/testing/Cargo.toml | 4 +- node/testing/src/cluster/mod.rs | 13 +- node/testing/src/main.rs | 10 +- node/testing/src/server/mod.rs | 168 +++++++++++++----- node/testing/src/server/simulator.rs | 12 +- node/testing/src/server/webnode.rs | 74 ++++++++ node/testing/src/simulator/mod.rs | 4 +- node/web/src/lib.rs | 33 +++- .../p2p_connection_incoming_actions.rs | 1 + p2p/src/connection/outgoing/mod.rs | 11 ++ ...p_connection_outgoing_effectful_effects.rs | 4 +- p2p/src/service_impl/webrtc/mod.rs | 19 +- p2p/src/webrtc/signaling_method/mod.rs | 38 +++- 23 files changed, 439 insertions(+), 126 deletions(-) create mode 100644 core/src/http.rs create mode 100644 node/testing/src/server/webnode.rs diff --git a/Cargo.lock b/Cargo.lock index 7d111810ec..c5dfd00aea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3855,7 +3855,6 @@ dependencies = [ "getrandom", "hex", "itertools 0.10.5", - "js-sys", "juniper", "kimchi", "lazy_static", @@ -4570,6 +4569,7 @@ dependencies = [ "bs58 0.4.0", "crypto_secretbox", "hex", + "js-sys", "lazy_static", "md5", "mina-hasher", @@ -4590,6 +4590,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "wasm_thread", + "web-sys", ] [[package]] @@ -4736,6 +4737,8 @@ version = "0.11.0" dependencies = [ "anyhow", "axum", + "base64 0.22.0", + "bs58 0.4.0", "clap 4.5.20", "console", "ctrlc", diff --git a/core/Cargo.toml b/core/Cargo.toml index 6cc227e4c4..b9b88d7e71 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -39,6 +39,8 @@ redux = { workspace = true, features=["serializable_callbacks"] } wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" wasm_thread = { version = "0.3", features = [ "es_modules" ] } +js-sys = "0.3" +web-sys = { version = "0.3", features = ["Window", "Response"] } [dev-dependencies] serde_json = { version = "1" } diff --git a/core/src/http.rs b/core/src/http.rs new file mode 100644 index 0000000000..830776cfb4 --- /dev/null +++ b/core/src/http.rs @@ -0,0 +1,49 @@ +#[cfg(target_family = "wasm")] +mod http { + use crate::thread; + use wasm_bindgen::prelude::*; + + fn to_io_err(err: JsValue) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, format!("{err:?}")) + } + + async fn _get_bytes(url: String) -> std::io::Result> { + use wasm_bindgen_futures::JsFuture; + use web_sys::Response; + + // let window = js_sys::global().dyn_into::().unwrap(); + let window = web_sys::window().unwrap(); + + let resp_value = JsFuture::from(window.fetch_with_str(&url)) + .await + .map_err(to_io_err)?; + + assert!(resp_value.is_instance_of::()); + let resp: Response = resp_value.dyn_into().unwrap(); + let js = JsFuture::from(resp.array_buffer().map_err(to_io_err)?) + .await + .map_err(to_io_err)?; + Ok(js_sys::Uint8Array::new(&js).to_vec()) + } + + pub async fn get_bytes(url: &str) -> std::io::Result> { + let url = url.to_owned(); + if thread::is_web_worker_thread() { + thread::run_async_fn_in_main_thread(move || _get_bytes(url)).await.expect("failed to run task in the main thread! Maybe main thread crashed or not initialized?") + } else { + _get_bytes(url).await + } + } + + pub fn get_bytes_blocking(url: &str) -> std::io::Result> { + let url = url.to_owned(); + if thread::is_web_worker_thread() { + thread::run_async_fn_in_main_thread_blocking(move || _get_bytes(url)).expect("failed to run task in the main thread! Maybe main thread crashed or not initialized?") + } else { + panic!("can't do blocking requests from main browser thread"); + } + } +} + +#[cfg(target_family = "wasm")] +pub use http::{get_bytes, get_bytes_blocking}; diff --git a/core/src/lib.rs b/core/src/lib.rs index ad9bfe68fa..b426daf5bd 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -2,6 +2,10 @@ pub mod invariants; pub mod log; pub mod requests; +// TODO(binier): refactor +#[cfg(target_family = "wasm")] +pub mod http; + pub mod channels; pub mod thread; diff --git a/frontend/src/app/app.component.ts b/frontend/src/app/app.component.ts index 6eaed1eeb9..8fd2baaa6e 100644 --- a/frontend/src/app/app.component.ts +++ b/frontend/src/app/app.component.ts @@ -42,6 +42,11 @@ export class AppComponent extends StoreDispatcher implements OnInit { } ngOnInit(): void { + const args = new URLSearchParams(window.location.search).get("a"); + if (!!args) { + window.localStorage.setItem("webnodeArgs", args); + } + this.select( getMergedRoute, () => this.initAppFunctionalities(), diff --git a/frontend/src/app/core/services/web-node.service.ts b/frontend/src/app/core/services/web-node.service.ts index 9dc17a8cb8..c072096286 100644 --- a/frontend/src/app/core/services/web-node.service.ts +++ b/frontend/src/app/core/services/web-node.service.ts @@ -15,6 +15,7 @@ export class WebNodeService { private readonly webnode$: BehaviorSubject = new BehaviorSubject(null); private webNodeKeyPair: { publicKey: string, privateKey: string }; + private webNodeNetwork: String; private webNodeStartTime: number; private sentryEvents: any = {}; @@ -46,14 +47,36 @@ export class WebNodeService { loadWasm$(): Observable { this.webNodeStartTime = Date.now(); + const args = (() => { + const raw = window.localStorage.getItem("webnodeArgs"); + if (raw == null) { + return null; + } + return JSON.parse(atob(raw)); + })(); if (isBrowser()) { return merge( of(any(window).webnode).pipe(filter(Boolean)), fromEvent(window, 'webNodeLoaded'), ).pipe( - switchMap(() => this.http.get<{ publicKey: string, privateKey: string }>('assets/webnode/web-node-secrets.json')), + switchMap(() => { + const DEFAULT_NETWORK = "devnet"; + if (!args) { + return this.http.get<{ publicKey: string, privateKey: string }>('assets/webnode/web-node-secrets.json') + .pipe(map(blockProducer => ({blockProducer, network: DEFAULT_NETWORK}))); + } + const data = { network: args["network"] || DEFAULT_NETWORK, blockProducer: {} as any }; + if (!!args["block_producer"]) { + data["blockProducer"] = { + privateKey: args["block_producer"].sec_key, + publicKey: args["block_producer"].pub_key, + }; + } + return of(data); + }), tap(data => { - this.webNodeKeyPair = data; + this.webNodeKeyPair = data.blockProducer; + this.webNodeNetwork = data.network; }), map(() => void 0), ); @@ -68,7 +91,19 @@ export class WebNodeService { switchMap((wasm: any) => from(wasm.default(undefined, new WebAssembly.Memory(this.memory))).pipe(map(() => wasm))), switchMap((wasm) => { this.webnodeProgress$.next('Loaded'); - return from(wasm.run(this.webNodeKeyPair.privateKey)); + const urls = (() => { + if (typeof this.webNodeNetwork == 'number') { + const url = `${window.location.origin}/clusters/${this.webNodeNetwork}/`; + return { + seeds: url + 'seeds', + genesisConfig: url + 'genesis/config' + } + } else { + return {}; + } + })(); + console.log("webnode config:", !!this.webNodeKeyPair.privateKey, this.webNodeNetwork, urls); + return from(wasm.run(this.webNodeKeyPair.privateKey, urls.seeds, urls.genesisConfig)); }), tap((webnode: any) => { any(window)['webnode'] = webnode; diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 98d1468f79..7c5c174d0c 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -77,8 +77,6 @@ fraction = { version = "=0.15.1", default-features = false, features = ["with-se getrandom = { version = "0.2", features = ["js"] } wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" -js-sys = "0.3" -web-sys = { version = "0.3", features = ["Window", "Response"] } [target.'cfg(not(target_family = "wasm"))'.dependencies] zstd = { version = "0.12", optional = true } diff --git a/ledger/src/proofs/circuit_blobs.rs b/ledger/src/proofs/circuit_blobs.rs index af19c7dc2a..18558061ef 100644 --- a/ledger/src/proofs/circuit_blobs.rs +++ b/ledger/src/proofs/circuit_blobs.rs @@ -1,52 +1,5 @@ use std::path::Path; -#[cfg(target_family = "wasm")] -mod http { - use openmina_core::thread; - use wasm_bindgen::prelude::*; - - fn to_io_err(err: JsValue) -> std::io::Error { - std::io::Error::new(std::io::ErrorKind::Other, format!("{err:?}")) - } - - async fn _get_bytes(url: String) -> std::io::Result> { - use wasm_bindgen_futures::JsFuture; - use web_sys::Response; - - // let window = js_sys::global().dyn_into::().unwrap(); - let window = web_sys::window().unwrap(); - - let resp_value = JsFuture::from(window.fetch_with_str(&url)) - .await - .map_err(to_io_err)?; - - assert!(resp_value.is_instance_of::()); - let resp: Response = resp_value.dyn_into().unwrap(); - let js = JsFuture::from(resp.array_buffer().map_err(to_io_err)?) - .await - .map_err(to_io_err)?; - Ok(js_sys::Uint8Array::new(&js).to_vec()) - } - - pub async fn get_bytes(url: &str) -> std::io::Result> { - let url = url.to_owned(); - if thread::is_web_worker_thread() { - thread::run_async_fn_in_main_thread(move || _get_bytes(url)).await.expect("failed to run task in the main thread! Maybe main thread crashed or not initialized?") - } else { - _get_bytes(url).await - } - } - - pub fn get_bytes_blocking(url: &str) -> std::io::Result> { - let url = url.to_owned(); - if thread::is_web_worker_thread() { - thread::run_async_fn_in_main_thread_blocking(move || _get_bytes(url)).expect("failed to run task in the main thread! Maybe main thread crashed or not initialized?") - } else { - panic!("can't do blocking requests from main browser thread"); - } - } -} - #[cfg(not(target_family = "wasm"))] pub fn home_base_dir() -> Option { let mut path = std::path::PathBuf::from(std::env::var("HOME").ok()?); @@ -130,7 +83,7 @@ pub async fn fetch(filename: &impl AsRef) -> std::io::Result> { let prefix = option_env!("CIRCUIT_BLOBS_HTTP_PREFIX").unwrap_or("/assets/webnode/circuit-blobs"); let url = format!("{prefix}/{}", filename.as_ref().to_str().unwrap()); - http::get_bytes(&url).await + openmina_core::http::get_bytes(&url).await // http::get_bytes(&git_release_url(filename)).await } @@ -139,5 +92,5 @@ pub fn fetch_blocking(filename: &impl AsRef) -> std::io::Result> { let prefix = option_env!("CIRCUIT_BLOBS_HTTP_PREFIX").unwrap_or("/assets/webnode/circuit-blobs"); let url = format!("{prefix}/{}", filename.as_ref().to_str().unwrap()); - http::get_bytes_blocking(&url) + openmina_core::http::get_bytes_blocking(&url) } diff --git a/node/account/src/secret_key.rs b/node/account/src/secret_key.rs index 40912149fd..bdc0ec941e 100644 --- a/node/account/src/secret_key.rs +++ b/node/account/src/secret_key.rs @@ -40,6 +40,10 @@ impl AccountSecretKey { GENERATED_DETERMINISTIC[i as usize].clone() } + pub fn deterministic_iter() -> impl Iterator { + GENERATED_DETERMINISTIC.iter() + } + pub fn max_deterministic_count() -> usize { GENERATED_DETERMINISTIC.len() } diff --git a/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs b/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs index 911c138539..6ae8c089d6 100644 --- a/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs +++ b/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs @@ -139,6 +139,17 @@ impl GenesisConfig { } } + pub fn override_genesis_state_timestamp(&mut self, timestamp: v2::BlockTimeTimeStableV1) { + match self { + Self::Counts { constants, .. } + | Self::BalancesDelegateTable { constants, .. } + | Self::AccountsBinProt { constants, .. } => { + constants.genesis_state_timestamp = timestamp; + } + _ => todo!(), + } + } + pub fn load( &self, ) -> anyhow::Result<(Vec, GenesisConfigLoaded), GenesisConfigError> { diff --git a/node/testing/Cargo.toml b/node/testing/Cargo.toml index ee0f78b31f..8f24a5be9a 100644 --- a/node/testing/Cargo.toml +++ b/node/testing/Cargo.toml @@ -39,6 +39,9 @@ vrf = { workspace = true } time = { version = "0.3", features = ["formatting"] } tracing = { version = "0.1", features = ["std"] } multihash = "0.18.1" +hex = "0.4.3" +bs58 = { version = "0.4" } +base64 = "0.22" nix = { version = "0.27.1", features = ["process", "signal"] } ctrlc = "3.4.2" @@ -51,7 +54,6 @@ openmina-core = { path = "../../core" } node = { path = "../../node" } openmina-node-invariants = { path = "../../node/invariants" } openmina-node-native = { path = "../../node/native" } -hex = "0.4.3" [target.'cfg(not(target_family = "wasm"))'.dependencies] redux = { workspace = true, features=["serializable_callbacks"] } diff --git a/node/testing/src/cluster/mod.rs b/node/testing/src/cluster/mod.rs index 0163b855c1..6cf7084b6d 100644 --- a/node/testing/src/cluster/mod.rs +++ b/node/testing/src/cluster/mod.rs @@ -117,13 +117,6 @@ lazy_static::lazy_static! { static ref VERIFIER_SRS: Arc = get_srs(); } -lazy_static::lazy_static! { - static ref DETERMINISTIC_ACCOUNT_SEC_KEYS: BTreeMap = (0..1000) - .map(AccountSecretKey::deterministic) - .map(|sec_key| (sec_key.public_key(), sec_key)) - .collect(); -} - pub struct Cluster { pub config: ClusterConfig, scenario: ClusterScenarioRun, @@ -193,9 +186,9 @@ impl Cluster { } pub fn get_account_sec_key(&self, pub_key: &AccountPublicKey) -> Option<&AccountSecretKey> { - self.account_sec_keys - .get(pub_key) - .or_else(|| DETERMINISTIC_ACCOUNT_SEC_KEYS.get(pub_key)) + self.account_sec_keys.get(pub_key).or_else(|| { + AccountSecretKey::deterministic_iter().find(|sec_key| &sec_key.public_key() == pub_key) + }) } pub fn set_initial_time(&mut self, initial_time: redux::Timestamp) { diff --git a/node/testing/src/main.rs b/node/testing/src/main.rs index 53b0c694ca..0398fe2dec 100644 --- a/node/testing/src/main.rs +++ b/node/testing/src/main.rs @@ -1,5 +1,6 @@ use clap::Parser; +use node::p2p::webrtc::Host; use openmina_node_testing::cluster::{Cluster, ClusterConfig}; use openmina_node_testing::scenario::Scenario; use openmina_node_testing::scenarios::Scenarios; @@ -24,8 +25,13 @@ pub enum Command { #[derive(Debug, clap::Args)] pub struct CommandServer { - #[arg(long, short, env, default_value = "11000")] + #[arg(long, short, default_value = "127.0.0.1")] + pub host: Host, + + #[arg(long, short, default_value = "11000")] pub port: u16, + #[arg(long, short)] + pub ssl_port: Option, } #[derive(Debug, clap::Args)] @@ -66,7 +72,7 @@ impl Command { match self { Self::Server(args) => { - server(rt, args.port); + server(rt, args.host, args.port, args.ssl_port); Ok(()) } Self::ScenariosGenerate(cmd) => { diff --git a/node/testing/src/server/mod.rs b/node/testing/src/server/mod.rs index 66dd715e7d..102c68e36d 100644 --- a/node/testing/src/server/mod.rs +++ b/node/testing/src/server/mod.rs @@ -1,10 +1,12 @@ pub mod simulator; +pub mod webnode; use crate::cluster::{Cluster, ClusterConfig, ClusterNodeId}; use crate::node::NodeTestingConfig; use crate::scenario::{event_details, Scenario, ScenarioId, ScenarioInfo, ScenarioStep}; use crate::service::PendingEventId; +use std::collections::BTreeSet; use std::path::PathBuf; use std::{collections::BTreeMap, sync::Arc, time::Duration}; @@ -17,8 +19,11 @@ use axum::{ routing::{get, post, put}, Json, Router, }; +use node::account::AccountPublicKey; use node::p2p::connection::outgoing::P2pConnectionOutgoingInitOpts; +use node::p2p::webrtc::{Host, Offer, P2pConnectionResponse, SignalingMethod}; use node::transition_frontier::genesis::{GenesisConfig, PrebuiltGenesisConfig}; +use openmina_node_native::p2p::webrtc::webrtc_signal_send; use rand::{rngs::StdRng, Rng, SeedableRng}; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; @@ -27,14 +32,14 @@ use tokio::sync::{oneshot, Mutex, MutexGuard, OwnedMutexGuard}; use tower_http::cors::CorsLayer; use tower_http::services::ServeDir; -pub fn server(rt: Runtime, port: u16) { +pub fn server(rt: Runtime, host: Host, port: u16, ssl_port: Option) { let fe_dist_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")) .join("../../") .join("frontend/dist/frontend/"); eprintln!("scenarios path: {}", Scenario::PATH); eprintln!("FrontEnd dist path: {fe_dist_dir:?}"); - let state = AppState::new(); + let state = AppState::new(host, ssl_port); let scenarios_router = Router::new() .route("/", get(scenario_list)) @@ -47,12 +52,17 @@ pub fn server(rt: Runtime, port: u16) { .route("/", get(cluster_list)) .route("/create/:scenario_id", put(cluster_create)) .route("/:cluster_id", get(cluster_get)) + .nest("/:cluster_id/webnode", webnode::router()) .route("/:cluster_id/run", post(cluster_run)) .route("/:cluster_id/run/auto", post(cluster_run_auto)) .route( "/:cluster_id/scenarios/reload", post(cluster_scenarios_reload), ) + .route( + "/:cluster_id/mina/webrtc/signal/:offer", + get(cluster_webrtc_signal), + ) .route("/:cluster_id/seeds", get(cluster_seeds)) .route("/:cluster_id/genesis/config", get(cluster_genesis_config)) .route( @@ -66,27 +76,24 @@ pub fn server(rt: Runtime, port: u16) { .route("/:cluster_id/destroy", post(cluster_destroy)); let cors = CorsLayer::very_permissive(); + let coop_coep = middleware::from_fn(|req, next: middleware::Next| async { + let mut resp = next.run(req).await; + resp.headers_mut().insert( + header::HeaderName::from_static("cross-origin-embedder-policy"), + header::HeaderValue::from_static("require-corp"), + ); + resp.headers_mut().insert( + header::HeaderName::from_static("cross-origin-opener-policy"), + header::HeaderValue::from_static("same-origin"), + ); + resp + }); let app = Router::new() .nest("/scenarios", scenarios_router) .nest("/clusters", clusters_router) .nest("/simulations", simulator::simulations_router()) - .fallback( - get_service(ServeDir::new(fe_dist_dir)).layer(middleware::from_fn( - |req, next: middleware::Next| async { - let mut resp = next.run(req).await; - resp.headers_mut().insert( - header::HeaderName::from_static("cross-origin-embedder-policy"), - header::HeaderValue::from_static("require-corp"), - ); - resp.headers_mut().insert( - header::HeaderName::from_static("cross-origin-opener-policy"), - header::HeaderValue::from_static("same-origin"), - ); - resp - }, - )), - ) + .fallback(get_service(ServeDir::new(&fe_dist_dir)).layer(coop_coep.clone())) .with_state(state) .layer(cors); @@ -100,15 +107,22 @@ pub fn server(rt: Runtime, port: u16) { } pub struct AppStateInner { + host: Host, + ssl_port: Option, rng: StdRng, clusters: BTreeMap>>, + // TODO(binier): move inside cluster state + locked_block_producer_keys: BTreeMap>, } impl AppStateInner { - pub fn new() -> Self { + pub fn new(host: Host, ssl_port: Option) -> Self { Self { + host, + ssl_port, rng: StdRng::seed_from_u64(0), clusters: Default::default(), + locked_block_producer_keys: Default::default(), } } } @@ -117,8 +131,8 @@ impl AppStateInner { pub struct AppState(Arc>); impl AppState { - pub fn new() -> Self { - Self(Arc::new(Mutex::new(AppStateInner::new()))) + pub fn new(host: Host, ssl_port: Option) -> Self { + Self(Arc::new(Mutex::new(AppStateInner::new(host, ssl_port)))) } pub async fn lock(&self) -> MutexGuard<'_, AppStateInner> { @@ -129,20 +143,14 @@ impl AppState { &self, cluster_id: u16, ) -> Result>, (StatusCode, String)> { - let state = self.lock().await; - state.clusters.get(&cluster_id).cloned().ok_or_else(|| { - ( - StatusCode::BAD_REQUEST, - format!("cluster {cluster_id} not found"), - ) - }) + self.lock().await.cluster_mutex(cluster_id) } pub async fn cluster( &self, cluster_id: u16, ) -> Result, (StatusCode, String)> { - Ok(self.cluster_mutex(cluster_id).await?.lock_owned().await) + self.lock().await.cluster(cluster_id).await } pub async fn cluster_create( @@ -197,7 +205,27 @@ impl AppState { } pub async fn cluster_destroy(&self, cluster_id: u16) -> bool { - self.lock().await.clusters.remove(&cluster_id).is_some() + let mut this = self.lock().await; + this.locked_block_producer_keys.remove(&cluster_id); + this.clusters.remove(&cluster_id).is_some() + } +} + +impl AppStateInner { + fn cluster_mutex(&self, cluster_id: u16) -> Result>, (StatusCode, String)> { + self.clusters.get(&cluster_id).cloned().ok_or_else(|| { + ( + StatusCode::BAD_REQUEST, + format!("cluster {cluster_id} not found"), + ) + }) + } + + pub async fn cluster( + &self, + cluster_id: u16, + ) -> Result, (StatusCode, String)> { + Ok(self.cluster_mutex(cluster_id)?.lock_owned().await) } } @@ -433,21 +461,77 @@ async fn cluster_scenarios_reload( .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string())) } +async fn cluster_webrtc_signal( + State(state): State, + Path((cluster_id, offer)): Path<(u16, String)>, +) -> Result, (StatusCode, Json)> { + let offer: Offer = Err(()) + .or_else(move |_| { + let json = bs58::decode(&offer).into_vec().or(Err(()))?; + serde_json::from_slice(&json).or(Err(())) + }) + .map_err(|_| { + ( + StatusCode::BAD_REQUEST, + Json(P2pConnectionResponse::SignalDecryptionFailed), + ) + })?; + + let internal_err = || { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(P2pConnectionResponse::InternalError), + ) + }; + + let http_url = { + let cluster = state + .cluster(cluster_id) + .await + .map_err(|_| internal_err())?; + let node = cluster + .node_by_peer_id(offer.target_peer_id) + .ok_or_else(internal_err)?; + let http_url = match node.dial_addr() { + P2pConnectionOutgoingInitOpts::WebRTC { signaling, .. } => signaling.http_url(), + _ => None, + }; + http_url.ok_or_else(internal_err)? + }; + let resp = webrtc_signal_send(&http_url, offer) + .await + .map_err(|_| internal_err())?; + Ok(Json(resp)) +} + async fn cluster_seeds( State(state): State, Path(cluster_id): Path, -) -> Result>, (StatusCode, String)> { - state - .cluster(cluster_id) - .await - .map(|cluster| { - cluster - .nodes_iter() - .filter(|(_, node)| node.config().initial_peers.is_empty()) - .map(|(_, node)| node.dial_addr()) - .collect() - }) - .map(Json) +) -> Result { + let state = state.lock().await; + let host = state.host.clone(); + let ssl_port = state.ssl_port; + state.cluster(cluster_id).await.map(|cluster| { + let list = cluster + .nodes_iter() + .filter(|(_, node)| node.config().initial_peers.is_empty()) + .map(|(_, node)| { + let mut addr = node.dial_addr(); + if let P2pConnectionOutgoingInitOpts::WebRTC { signaling, .. } = &mut addr { + if let SignalingMethod::Http(http) = signaling { + if let Some(port) = ssl_port { + http.host = host.clone(); + http.port = port; + *signaling = SignalingMethod::HttpsProxy(cluster_id, http.clone()); + } + } + } + addr = addr.to_string().parse().unwrap(); + addr.to_string() + }) + .collect::>(); + list.join("\n") + }) } async fn cluster_genesis_config( diff --git a/node/testing/src/server/simulator.rs b/node/testing/src/server/simulator.rs index a4669979f0..e033d53012 100644 --- a/node/testing/src/server/simulator.rs +++ b/node/testing/src/server/simulator.rs @@ -5,6 +5,7 @@ use axum::{ http::StatusCode, routing::put, }; +use mina_p2p_messages::v2; use openmina_core::channels::oneshot; use serde::{Deserialize, Serialize}; @@ -24,6 +25,8 @@ pub fn simulations_router() -> axum::Router { struct SimulationCreateArgs { cluster: ClusterConfig, simulator: SimulatorConfig, + #[serde(default)] + override_genesis_state_timestamp: bool, } #[derive(Serialize)] @@ -37,11 +40,18 @@ async fn simulation_create( ) -> Result, (StatusCode, String)> { async fn setup( state: AppState, - args: SimulationCreateArgs, + mut args: SimulationCreateArgs, ) -> Result<(u16, Simulator), (StatusCode, String)> { let (cluster_id, mut cluster) = state.cluster_create_empty(args.cluster).await?; let initial_time = redux::Timestamp::global_now(); + if args.override_genesis_state_timestamp { + Arc::get_mut(&mut args.simulator.genesis) + .unwrap() + .override_genesis_state_timestamp(v2::BlockTimeTimeStableV1( + (u64::from(initial_time) / 1_000_000).into(), + )); + } let mut simulator = Simulator::new(initial_time, args.simulator); simulator .setup(&mut ClusterRunner::new(&mut cluster, |_| {})) diff --git a/node/testing/src/server/webnode.rs b/node/testing/src/server/webnode.rs new file mode 100644 index 0000000000..6fde723bba --- /dev/null +++ b/node/testing/src/server/webnode.rs @@ -0,0 +1,74 @@ +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::Redirect, + routing::get, +}; +use rand::prelude::*; + +use crate::scenarios::ClusterRunner; + +use super::AppState; + +pub fn router() -> axum::Router { + axum::Router::new() + .route("/", get(webnode_get)) + .route("/lock-random-bp", get(webnode_lock_random_bp)) +} + +async fn webnode_get( + State(state): State, + Path(cluster_id): Path, +) -> Result { + use base64::{engine::general_purpose::URL_SAFE, Engine as _}; + // make sure cluster exists + state.cluster_mutex(cluster_id).await?; + + let args = serde_json::json!({ + "network": cluster_id, + }) + .to_string(); + let args = URL_SAFE.encode(&args); + + Ok(Redirect::temporary(&format!("/?a={args}"))) +} + +async fn webnode_lock_random_bp( + State(state): State, + Path(cluster_id): Path, +) -> Result { + use base64::{engine::general_purpose::URL_SAFE, Engine as _}; + let mut state_guard = state.lock().await; + let state = &mut *state_guard; + let mut cluster = state.cluster(cluster_id).await?; + let runner = ClusterRunner::new(&mut cluster, |_| {}); + let locked_keys = state + .locked_block_producer_keys + .entry(cluster_id) + .or_default(); + + let (sec_key, _) = runner + .block_producer_sec_keys(runner.nodes_iter().next().unwrap().0) + .into_iter() + .filter(|(key, _)| !locked_keys.contains(&key.public_key())) + .choose(&mut state.rng) + .ok_or_else(|| { + ( + StatusCode::NOT_ACCEPTABLE, + "no more block producer keys available!".to_owned(), + ) + })?; + locked_keys.insert(sec_key.public_key()); + + let args = serde_json::json!({ + "network": cluster_id, + "block_producer": { + "sec_key": sec_key.to_string(), + "pub_key": sec_key.public_key().to_string(), + }, + }) + .to_string(); + let args = URL_SAFE.encode(&args); + + Ok(Redirect::temporary(&format!("/?a={args}"))) +} diff --git a/node/testing/src/simulator/mod.rs b/node/testing/src/simulator/mod.rs index 89a9e29dc4..aa9540eb52 100644 --- a/node/testing/src/simulator/mod.rs +++ b/node/testing/src/simulator/mod.rs @@ -19,6 +19,7 @@ use crate::{ pub struct Simulator { initial_time: redux::Timestamp, config: SimulatorConfig, + start_t: Option, } impl Simulator { @@ -26,6 +27,7 @@ impl Simulator { Self { initial_time, config, + start_t: None, } } @@ -230,7 +232,7 @@ impl Simulator { pub async fn run<'a>(&mut self, runner: &mut ClusterRunner<'a>) { let run_until = self.config.run_until.clone(); let advance_time = self.config.advance_time.clone(); - let start_t = redux::Instant::now(); + let start_t = *self.start_t.get_or_insert_with(redux::Instant::now); let mut last_printed_slot = 0; let virtual_initial_time = self.initial_time(); diff --git a/node/web/src/lib.rs b/node/web/src/lib.rs index d7c4c10946..02563f467d 100644 --- a/node/web/src/lib.rs +++ b/node/web/src/lib.rs @@ -1,5 +1,6 @@ #![cfg(target_family = "wasm")] +use ::node::transition_frontier::genesis::GenesisConfig; pub use openmina_node_common::*; mod rayon; @@ -31,7 +32,11 @@ fn main() { } #[wasm_bindgen] -pub async fn run(block_producer: Option) -> RpcSender { +pub async fn run( + block_producer: Option, + seed_nodes_url: Option, + genesis_config_url: Option, +) -> RpcSender { let block_producer: Option = block_producer.map(|key| { key.parse() .expect("failed to parse passed block producer keys") @@ -40,7 +45,7 @@ pub async fn run(block_producer: Option) -> RpcSender { let (rpc_sender_tx, rpc_sender_rx) = ::node::core::channels::oneshot::channel(); let _ = thread::spawn(move || { wasm_bindgen_futures::spawn_local(async move { - let mut node = setup_node(block_producer).await; + let mut node = setup_node(block_producer, seed_nodes_url, genesis_config_url).await; let _ = rpc_sender_tx.send(node.rpc()); node.run_forever().await; }); @@ -53,16 +58,38 @@ pub async fn run(block_producer: Option) -> RpcSender { async fn setup_node( block_producer: Option, + seed_nodes_url: Option, + genesis_config_url: Option, ) -> openmina_node_common::Node { let block_verifier_index = BlockVerifier::make().await; let work_verifier_index = TransactionVerifier::make().await; - let genesis_config = ::node::config::DEVNET_CONFIG.clone(); + let genesis_config = if let Some(genesis_config_url) = genesis_config_url { + let bytes = ::node::core::http::get_bytes(&genesis_config_url) + .await + .expect("failed to fetch genesis config"); + GenesisConfig::Prebuilt(bytes.into()).into() + } else { + ::node::config::DEVNET_CONFIG.clone() + }; + let mut node_builder: NodeBuilder = NodeBuilder::new(None, genesis_config); node_builder .block_verifier_index(block_verifier_index.clone()) .work_verifier_index(work_verifier_index.clone()); + // TODO(binier): refactor + if let Some(seed_nodes_url) = seed_nodes_url { + let peers = ::node::core::http::get_bytes(&seed_nodes_url) + .await + .expect("failed to fetch seed nodes"); + node_builder.initial_peers( + String::from_utf8_lossy(&peers) + .split("\n") + .map(|s| s.parse().expect("failed to parse seed node addr")), + ); + } + if let Some(bp_key) = block_producer { thread::spawn(move || { BlockProver::make(Some(block_verifier_index), Some(work_verifier_index)); diff --git a/p2p/src/connection/incoming/p2p_connection_incoming_actions.rs b/p2p/src/connection/incoming/p2p_connection_incoming_actions.rs index 00e6f45710..aefdc64f4c 100644 --- a/p2p/src/connection/incoming/p2p_connection_incoming_actions.rs +++ b/p2p/src/connection/incoming/p2p_connection_incoming_actions.rs @@ -56,6 +56,7 @@ pub enum P2pConnectionIncomingAction { peer_id: PeerId, }, /// Error establishing incoming connection. + #[action_event(level = warn, fields(display(peer_id), display(error)))] Error { peer_id: PeerId, error: P2pConnectionIncomingError, diff --git a/p2p/src/connection/outgoing/mod.rs b/p2p/src/connection/outgoing/mod.rs index e9b6bdf6ea..58dbb7d75d 100644 --- a/p2p/src/connection/outgoing/mod.rs +++ b/p2p/src/connection/outgoing/mod.rs @@ -278,6 +278,17 @@ impl P2pConnectionOutgoingInitOpts { (*peer_id).to_string().into_bytes().into(), ), }), + SignalingMethod::HttpsProxy(cluster_id, info) => { + Some(v2::NetworkPeerPeerStableV1 { + host: format!("https://{}/clusters/{cluster_id}", info.host) + .as_bytes() + .into(), + libp2p_port: (info.port as u64).into(), + peer_id: v2::NetworkPeerPeerIdStableV1( + (*peer_id).to_string().into_bytes().into(), + ), + }) + } SignalingMethod::P2p { .. } => None, }, } diff --git a/p2p/src/connection/outgoing_effectful/p2p_connection_outgoing_effectful_effects.rs b/p2p/src/connection/outgoing_effectful/p2p_connection_outgoing_effectful_effects.rs index 435ae8408d..c36bf8f7a2 100644 --- a/p2p/src/connection/outgoing_effectful/p2p_connection_outgoing_effectful_effects.rs +++ b/p2p/src/connection/outgoing_effectful/p2p_connection_outgoing_effectful_effects.rs @@ -55,7 +55,9 @@ impl P2pConnectionOutgoingEffectfulAction { _ => return, }; match signaling_method { - webrtc::SignalingMethod::Http(_) | webrtc::SignalingMethod::Https(_) => { + webrtc::SignalingMethod::Http(_) + | webrtc::SignalingMethod::Https(_) + | webrtc::SignalingMethod::HttpsProxy(_, _) => { let Some(url) = signaling_method.http_url() else { return; }; diff --git a/p2p/src/service_impl/webrtc/mod.rs b/p2p/src/service_impl/webrtc/mod.rs index 8704d4396f..8b669f9412 100644 --- a/p2p/src/service_impl/webrtc/mod.rs +++ b/p2p/src/service_impl/webrtc/mod.rs @@ -28,13 +28,20 @@ use crate::{ }; #[cfg(not(target_arch = "wasm32"))] -use self::native::{ - webrtc_signal_send, RTCChannel, RTCConnection, RTCConnectionState, RTCSignalingError, -}; +mod imports { + pub use super::native::{ + webrtc_signal_send, RTCChannel, RTCConnection, RTCConnectionState, RTCSignalingError, + }; +} #[cfg(target_arch = "wasm32")] -use self::web::{ - webrtc_signal_send, RTCChannel, RTCConnection, RTCConnectionState, RTCSignalingError, -}; +mod imports { + pub use super::web::{ + webrtc_signal_send, RTCChannel, RTCConnection, RTCConnectionState, RTCSignalingError, + }; +} + +use imports::*; +pub use imports::{webrtc_signal_send, RTCSignalingError}; use super::TaskSpawner; diff --git a/p2p/src/webrtc/signaling_method/mod.rs b/p2p/src/webrtc/signaling_method/mod.rs index ff09d5a7d8..73ae4e23d9 100644 --- a/p2p/src/webrtc/signaling_method/mod.rs +++ b/p2p/src/webrtc/signaling_method/mod.rs @@ -13,13 +13,17 @@ use crate::PeerId; pub enum SignalingMethod { Http(HttpSignalingInfo), Https(HttpSignalingInfo), - P2p { relay_peer_id: PeerId }, + /// Proxy used as an SSL gateway to the actual signaling server. + HttpsProxy(u16, HttpSignalingInfo), + P2p { + relay_peer_id: PeerId, + }, } impl SignalingMethod { pub fn can_connect_directly(&self) -> bool { match self { - Self::Http(_) | Self::Https(_) => true, + Self::Http(_) | Self::Https(_) | Self::HttpsProxy(_, _) => true, Self::P2p { .. } => false, } } @@ -30,6 +34,12 @@ impl SignalingMethod { let (http, info) = match self { Self::Http(info) => ("http", info), Self::Https(info) => ("https", info), + Self::HttpsProxy(cluster_id, info) => { + return Some(format!( + "https://{}:{}/clusters/{}/mina/webrtc/signal", + info.host, info.port, cluster_id + )); + } _ => return None, }; Some(format!( @@ -57,6 +67,10 @@ impl fmt::Display for SignalingMethod { write!(f, "/https")?; signaling.fmt(f) } + Self::HttpsProxy(cluster_id, signaling) => { + write!(f, "/https_proxy/{cluster_id}")?; + signaling.fmt(f) + } Self::P2p { relay_peer_id } => { write!(f, "/p2p/{relay_peer_id}") } @@ -70,6 +84,8 @@ pub enum SignalingMethodParseError { NotEnoughArgs, #[error("unknown signaling method: `{0}`")] UnknownSignalingMethod(String), + #[error("invalid cluster id")] + InvalidClusterId, #[error("host parse error: {0}")] HostParseError(String), #[error("host parse error: {0}")] @@ -90,9 +106,23 @@ impl FromStr for SignalingMethod { .filter(|i| s.len() > *i) .ok_or(SignalingMethodParseError::NotEnoughArgs)?; + let rest = &s[method_end_index..]; match &s[1..method_end_index] { - "http" => Ok(Self::Http(s[method_end_index..].parse()?)), - "https" => Ok(Self::Https(s[method_end_index..].parse()?)), + "http" => Ok(Self::Http(rest.parse()?)), + "https" => Ok(Self::Https(rest.parse()?)), + "https_proxy" => { + let mut iter = rest.splitn(3, '/').filter(|v| !v.trim().is_empty()); + let (cluster_id, rest) = ( + iter.next() + .ok_or(SignalingMethodParseError::NotEnoughArgs)?, + iter.next() + .ok_or(SignalingMethodParseError::NotEnoughArgs)?, + ); + let cluster_id: u16 = cluster_id + .parse() + .or(Err(SignalingMethodParseError::InvalidClusterId))?; + Ok(Self::HttpsProxy(cluster_id, rest.parse()?)) + } method => Err(SignalingMethodParseError::UnknownSignalingMethod( method.to_owned(), )), From f784ccc09f4e7a658a1900bb1e32942efc0d61ea Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Tue, 12 Nov 2024 07:50:19 +0400 Subject: [PATCH 07/10] feat(node): delay genesis proof production as much as possible, to maybe avoid it if possible --- node/src/action_kind.rs | 4 +- .../block_producer/block_producer_actions.rs | 17 +++++-- .../block_producer/block_producer_reducer.rs | 48 ++++++++++--------- .../block_producer/block_producer_state.rs | 2 +- node/src/effects.rs | 2 + node/src/state.rs | 26 +++++----- .../transition_frontier_genesis_actions.rs | 2 +- .../sync/transition_frontier_sync_actions.rs | 2 +- .../transition_frontier_actions.rs | 24 ++++++---- .../transition_frontier_effects.rs | 11 ++++- .../transition_frontier_reducer.rs | 19 ++++++-- 11 files changed, 103 insertions(+), 54 deletions(-) diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index d5dc25d10d..cee7438a5d 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -616,6 +616,7 @@ pub enum ActionKind { TransactionPoolVerifyError, TransactionPoolEffectfulFetchAccounts, TransitionFrontierGenesisInject, + TransitionFrontierGenesisProvenInject, TransitionFrontierSyncFailed, TransitionFrontierSynced, TransitionFrontierGenesisLedgerLoadInit, @@ -705,7 +706,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 590; + pub const COUNT: u16 = 591; } impl std::fmt::Display for ActionKind { @@ -859,6 +860,7 @@ impl ActionKindGet for TransitionFrontierAction { Self::GenesisEffect(a) => a.kind(), Self::Sync(a) => a.kind(), Self::GenesisInject => ActionKind::TransitionFrontierGenesisInject, + Self::GenesisProvenInject => ActionKind::TransitionFrontierGenesisProvenInject, Self::Synced { .. } => ActionKind::TransitionFrontierSynced, Self::SyncFailed { .. } => ActionKind::TransitionFrontierSyncFailed, } diff --git a/node/src/block_producer/block_producer_actions.rs b/node/src/block_producer/block_producer_actions.rs index 5b33c7f7cf..7f0cd7f519 100644 --- a/node/src/block_producer/block_producer_actions.rs +++ b/node/src/block_producer/block_producer_actions.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use ledger::scan_state::transaction_logic::valid; use mina_p2p_messages::v2::MinaBaseProofStableV2; use openmina_core::block::ArcBlockWithHash; @@ -100,9 +102,18 @@ impl redux::EnablingCondition for BlockProducerAction { BlockProducerAction::WonSlotWait => state .block_producer .with(false, |this| this.current.won_slot_should_wait(time)), - BlockProducerAction::WonSlotProduceInit { .. } => state - .block_producer - .with(false, |this| this.current.won_slot_should_produce(time)), + BlockProducerAction::WonSlotProduceInit { .. } => { + state.block_producer.with(false, |this| { + let has_genesis_proven_if_needed = || { + state.transition_frontier.best_tip().map_or(false, |tip| { + let proven_block = state.transition_frontier.genesis.proven_block(); + !tip.is_genesis() + || proven_block.map_or(false, |b| Arc::ptr_eq(&b.block, &tip.block)) + }) + }; + this.current.won_slot_should_produce(time) && has_genesis_proven_if_needed() + }) + } BlockProducerAction::WonSlotTransactionsGet => { state.block_producer.with(false, |this| { matches!( diff --git a/node/src/block_producer/block_producer_reducer.rs b/node/src/block_producer/block_producer_reducer.rs index f9cca60e45..4c3165cb27 100644 --- a/node/src/block_producer/block_producer_reducer.rs +++ b/node/src/block_producer/block_producer_reducer.rs @@ -104,6 +104,31 @@ impl BlockProducerEnabled { }; } } + BlockProducerAction::WonSlotProduceInit => { + if let Some(won_slot) = state.current.won_slot() { + if let Some(chain) = best_chain.last().map(|best_tip| { + if best_tip.global_slot() == won_slot.global_slot() { + // We are producing block which replaces current best tip + // instead of extending it. + best_chain + .get(..best_chain.len().saturating_sub(1)) + .unwrap_or(&[]) + .to_vec() + } else { + best_chain.to_vec() + } + }) { + state.current = BlockProducerCurrentState::WonSlotProduceInit { + time: meta.time(), + won_slot: won_slot.clone(), + chain, + }; + }; + } + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(BlockProducerAction::WonSlotTransactionsGet); + } BlockProducerAction::WonSlotTransactionsGet => { let BlockProducerCurrentState::WonSlotProduceInit { won_slot, chain, .. @@ -141,29 +166,6 @@ impl BlockProducerEnabled { let dispatcher = state_context.into_dispatcher(); dispatcher.push(BlockProducerAction::StagedLedgerDiffCreateInit); } - BlockProducerAction::WonSlotProduceInit => { - if let Some(won_slot) = state.current.won_slot() { - if let Some(chain) = best_chain.last().map(|best_tip| { - if best_tip.global_slot() == won_slot.global_slot() { - best_chain - .get(..best_chain.len().saturating_sub(1)) - .unwrap_or(&[]) - .to_vec() - } else { - best_chain.to_vec() - } - }) { - state.current = BlockProducerCurrentState::WonSlotProduceInit { - time: meta.time(), - won_slot: won_slot.clone(), - chain, - }; - }; - } - - let dispatcher = state_context.into_dispatcher(); - dispatcher.push(BlockProducerAction::WonSlotTransactionsGet); - } BlockProducerAction::StagedLedgerDiffCreateInit => { let dispatcher = state_context.into_dispatcher(); dispatcher.push(BlockProducerEffectfulAction::StagedLedgerDiffCreateInit); diff --git a/node/src/block_producer/block_producer_state.rs b/node/src/block_producer/block_producer_state.rs index 384d1ad8c2..b16d1bd1d4 100644 --- a/node/src/block_producer/block_producer_state.rs +++ b/node/src/block_producer/block_producer_state.rs @@ -296,7 +296,7 @@ impl BlockProducerCurrentState { #[cfg(not(target_arch = "wasm32"))] const BLOCK_PRODUCTION_ESTIMATE: u64 = Duration::from_secs(5).as_nanos() as u64; #[cfg(target_arch = "wasm32")] - const BLOCK_PRODUCTION_ESTIMATE: u64 = Duration::from_secs(20).as_nanos() as u64; + const BLOCK_PRODUCTION_ESTIMATE: u64 = Duration::from_secs(18).as_nanos() as u64; let slot_interval = Duration::from_secs(3 * 60).as_nanos() as u64; match self { diff --git a/node/src/effects.rs b/node/src/effects.rs index 88c051216a..8aad7a94ab 100644 --- a/node/src/effects.rs +++ b/node/src/effects.rs @@ -37,6 +37,8 @@ pub fn effects(store: &mut Store, action: ActionWithMeta) { store.dispatch(TransitionFrontierGenesisAction::LedgerLoadInit); store.dispatch(ExternalSnarkWorkerAction::Start); + store.dispatch(TransitionFrontierGenesisAction::ProveInit); + if store.state().p2p.ready().is_some() { p2p_request_best_tip_if_needed(store); p2p_request_transactions_if_needed(store); diff --git a/node/src/state.rs b/node/src/state.rs index 9b5049af1b..6dba81f5dd 100644 --- a/node/src/state.rs +++ b/node/src/state.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use mina_p2p_messages::v2::{MinaBaseUserCommandStableV2, MinaBlockBlockStableV2}; use rand::prelude::*; @@ -333,17 +334,20 @@ impl State { ) } - pub fn should_produce_blocks_after_genesis(&self) -> bool { - self.block_producer.is_enabled() - && self.genesis_block().map_or(false, |b| { - let slot = &b.consensus_state().curr_global_slot_since_hard_fork; - let epoch = slot - .slot_number - .as_u32() - .checked_div(slot.slots_per_epoch.as_u32()) - .expect("division by 0"); - self.current_epoch() <= Some(epoch) - }) + pub fn producing_block_after_genesis(&self) -> bool { + #[allow(clippy::arithmetic_side_effects)] + let two_mins_in_future = self.time() + Duration::from_secs(2 * 60); + self.block_producer.with(false, |bp| { + bp.current.won_slot_should_produce(two_mins_in_future) + }) && self.genesis_block().map_or(false, |b| { + let slot = &b.consensus_state().curr_global_slot_since_hard_fork; + let epoch = slot + .slot_number + .as_u32() + .checked_div(slot.slots_per_epoch.as_u32()) + .expect("division by 0"); + self.current_epoch() <= Some(epoch) + }) } pub fn should_log_node_id(&self) -> bool { diff --git a/node/src/transition_frontier/genesis/transition_frontier_genesis_actions.rs b/node/src/transition_frontier/genesis/transition_frontier_genesis_actions.rs index a53ab8c993..94ef1405e1 100644 --- a/node/src/transition_frontier/genesis/transition_frontier_genesis_actions.rs +++ b/node/src/transition_frontier/genesis/transition_frontier_genesis_actions.rs @@ -51,7 +51,7 @@ impl redux::EnablingCondition for TransitionFrontierGenesisAction TransitionFrontierGenesisState::LedgerLoadSuccess { .. } ), TransitionFrontierGenesisAction::ProveInit => { - state.should_produce_blocks_after_genesis() + state.producing_block_after_genesis() && matches!( genesis_state, TransitionFrontierGenesisState::Produced { .. } diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_actions.rs b/node/src/transition_frontier/sync/transition_frontier_sync_actions.rs index c2c37d4857..cb686091d7 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_actions.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_actions.rs @@ -146,7 +146,7 @@ impl redux::EnablingCondition for TransitionFrontierSyncAction { && state .transition_frontier .best_tip() - .map_or(true, |tip| best_tip.hash != tip.hash) + .map_or(false, |tip| best_tip.hash != tip.hash) && state .transition_frontier .sync diff --git a/node/src/transition_frontier/transition_frontier_actions.rs b/node/src/transition_frontier/transition_frontier_actions.rs index bdd6c4f73e..679cafa3dc 100644 --- a/node/src/transition_frontier/transition_frontier_actions.rs +++ b/node/src/transition_frontier/transition_frontier_actions.rs @@ -1,4 +1,5 @@ use std::collections::BTreeSet; +use std::sync::Arc; use mina_p2p_messages::v2::StateHash; use openmina_core::block::ArcBlockWithHash; @@ -25,6 +26,8 @@ pub enum TransitionFrontierAction { /// block, otherwise we don't need it so we use dummy proof instead. #[action_event(level = info)] GenesisInject, + #[action_event(level = info)] + GenesisProvenInject, Sync(TransitionFrontierSyncAction), /// Transition frontier synced. @@ -44,15 +47,20 @@ impl redux::EnablingCondition for TransitionFrontierAction { TransitionFrontierAction::Genesis(a) => a.is_enabled(state, time), TransitionFrontierAction::GenesisEffect(a) => a.is_enabled(state, time), TransitionFrontierAction::GenesisInject => { - if state.transition_frontier.best_tip().is_some() { + state.transition_frontier.root().is_none() + && state + .transition_frontier + .genesis + .block_with_real_or_dummy_proof() + .is_some() + } + TransitionFrontierAction::GenesisProvenInject => { + let Some(genesis) = state.transition_frontier.genesis.proven_block() else { return false; - } - let genesis_state = &state.transition_frontier.genesis; - if state.should_produce_blocks_after_genesis() { - genesis_state.proven_block().is_some() - } else { - genesis_state.block_with_dummy_proof().is_some() - } + }; + state.transition_frontier.root().map_or(true, |b| { + b.is_genesis() && !Arc::ptr_eq(&genesis.block, &b.block) + }) } TransitionFrontierAction::Sync(a) => a.is_enabled(state, time), TransitionFrontierAction::Synced { .. } => matches!( diff --git a/node/src/transition_frontier/transition_frontier_effects.rs b/node/src/transition_frontier/transition_frontier_effects.rs index 9597d639f9..c8b2ae97fe 100644 --- a/node/src/transition_frontier/transition_frontier_effects.rs +++ b/node/src/transition_frontier/transition_frontier_effects.rs @@ -34,10 +34,12 @@ pub fn transition_frontier_effects( // TODO(refactor): this should be handled by a callback and removed from here // whenever any of these is going to happen, genesisinject must happen first match &a { - TransitionFrontierGenesisAction::Produce - | TransitionFrontierGenesisAction::ProveSuccess { .. } => { + TransitionFrontierGenesisAction::Produce => { store.dispatch(TransitionFrontierAction::GenesisInject); } + TransitionFrontierGenesisAction::ProveSuccess { .. } => { + store.dispatch(TransitionFrontierAction::GenesisProvenInject); + } _ => {} } } @@ -47,6 +49,11 @@ pub fn transition_frontier_effects( TransitionFrontierAction::GenesisInject => { synced_effects(&meta, store); } + TransitionFrontierAction::GenesisProvenInject => { + if store.state().transition_frontier.sync.is_synced() { + synced_effects(&meta, store); + } + } TransitionFrontierAction::Sync(a) => { match a { TransitionFrontierSyncAction::Init { diff --git a/node/src/transition_frontier/transition_frontier_reducer.rs b/node/src/transition_frontier/transition_frontier_reducer.rs index 420a447c83..559e2f7e6d 100644 --- a/node/src/transition_frontier/transition_frontier_reducer.rs +++ b/node/src/transition_frontier/transition_frontier_reducer.rs @@ -34,9 +34,22 @@ impl TransitionFrontierState { block: genesis, just_emitted_a_proof: true, }; - let new_chain = vec![genesis]; - state.chain_diff = state.maybe_make_chain_diff(&new_chain); - state.best_chain = new_chain; + state.best_chain = vec![genesis]; + state.sync = TransitionFrontierSyncState::Synced { time: meta.time() }; + } + TransitionFrontierAction::GenesisProvenInject => { + let Some(genesis) = state.genesis.proven_block() else { + return; + }; + if let Some(block) = state.best_chain.get_mut(0) { + block.block = genesis.clone(); + } else { + let genesis = AppliedBlock { + block: genesis.clone(), + just_emitted_a_proof: true, + }; + state.best_chain = vec![genesis]; + } if !state.sync.is_pending() { state.sync = TransitionFrontierSyncState::Synced { time: meta.time() }; } From edae0cfcfc9c56f63d1d928374360c89cf7d6db6 Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Wed, 13 Nov 2024 14:51:09 +0400 Subject: [PATCH 08/10] chore(p2p): adjust timeouts for slower network --- p2p/src/channels/streaming_rpc/rpcs/mod.rs | 2 +- p2p/src/p2p_config.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/p2p/src/channels/streaming_rpc/rpcs/mod.rs b/p2p/src/channels/streaming_rpc/rpcs/mod.rs index 4330b3c431..5b2daaf9e5 100644 --- a/p2p/src/channels/streaming_rpc/rpcs/mod.rs +++ b/p2p/src/channels/streaming_rpc/rpcs/mod.rs @@ -47,7 +47,7 @@ impl P2pStreamingRpcKind { pub fn timeout(self, _config: &P2pTimeouts) -> Option { match self { // TODO(binier): use config - Self::StagedLedgerParts => Some(Duration::from_secs(20)), + Self::StagedLedgerParts => Some(Duration::from_secs(30)), } } } diff --git a/p2p/src/p2p_config.rs b/p2p/src/p2p_config.rs index dc0be207f8..61e46a4d2c 100644 --- a/p2p/src/p2p_config.rs +++ b/p2p/src/p2p_config.rs @@ -99,7 +99,7 @@ impl Default for P2pTimeouts { ), outgoing_connection_timeout: from_env_or( "OUTGOING_CONNECTION_TIMEOUT", - Some(Duration::from_secs(10)), + Some(Duration::from_secs(15)), ), reconnect_timeout: from_env_or("RECONNECT_TIMEOUT", Some(Duration::from_secs(1))), incoming_error_reconnect_timeout: from_env_or( @@ -112,15 +112,15 @@ impl Default for P2pTimeouts { ), best_tip_with_proof: from_env_or( "BEST_TIP_WITH_PROOF_TIMEOUT", - Some(Duration::from_secs(10)), + Some(Duration::from_secs(15)), ), - ledger_query: from_env_or("LEDGER_QUERY_TIMEOUT", Some(Duration::from_secs(2))), + ledger_query: from_env_or("LEDGER_QUERY_TIMEOUT", Some(Duration::from_secs(4))), staged_ledger_aux_and_pending_coinbases_at_block: from_env_or( "STAGED_LEDGER_AUX_AND_PENDING_COINBASES_AT_BLOCK_TIMEOUT", - Some(Duration::from_secs(120)), + Some(Duration::from_secs(180)), ), - block: from_env_or("BLOCK_TIMEOUT", Some(Duration::from_secs(5))), - snark: from_env_or("SNARK_TIMEOUT", Some(Duration::from_secs(5))), + block: from_env_or("BLOCK_TIMEOUT", Some(Duration::from_secs(8))), + snark: from_env_or("SNARK_TIMEOUT", Some(Duration::from_secs(8))), initial_peers: from_env_or("INITIAL_PEERS_TIMEOUT", Some(Duration::from_secs(5))), kademlia_bootstrap: from_env_or( "KADEMLIA_BOOTSTRAP_TIMEOUT", From 924db220af88b1a8fd052ab1a23fddc3cfb02b11 Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Wed, 13 Nov 2024 14:51:40 +0400 Subject: [PATCH 09/10] chore(p2p/webrtc): add openmina stun server --- p2p/src/service_impl/webrtc/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/p2p/src/service_impl/webrtc/mod.rs b/p2p/src/service_impl/webrtc/mod.rs index 8b669f9412..e57589ae4d 100644 --- a/p2p/src/service_impl/webrtc/mod.rs +++ b/p2p/src/service_impl/webrtc/mod.rs @@ -149,11 +149,11 @@ pub struct RTCChannelConfig { impl Default for RTCConfigIceServers { fn default() -> Self { Self(vec![ - // RTCConfigIceServer { - // urls: vec!["stun:65.109.110.75:3478".to_owned()], - // username: Some("openmina".to_owned()), - // credential: Some("webrtc".to_owned()), - // }, + RTCConfigIceServer { + urls: vec!["stun:65.109.110.75:3478".to_owned()], + username: Some("openmina".to_owned()), + credential: Some("webrtc".to_owned()), + }, RTCConfigIceServer { urls: vec![ "stun:stun.l.google.com:19302".to_owned(), From 36a81f1579093d2c924ce944773dc2214c49c59d Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Thu, 14 Nov 2024 15:15:19 +0400 Subject: [PATCH 10/10] fix(node/replay): postcard deserialization failure --- .../genesis/transition_frontier_genesis_config.rs | 1 - node/testing/src/server/simulator.rs | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs b/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs index 6ae8c089d6..3b561d8776 100644 --- a/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs +++ b/node/src/transition_frontier/genesis/transition_frontier_genesis_config.rs @@ -33,7 +33,6 @@ use crate::{ pub use GenesisConfig as TransitionFrontierGenesisConfig; #[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(tag = "kind")] pub enum GenesisConfig { Counts { whales: usize, diff --git a/node/testing/src/server/simulator.rs b/node/testing/src/server/simulator.rs index e033d53012..f4df9a0df1 100644 --- a/node/testing/src/server/simulator.rs +++ b/node/testing/src/server/simulator.rs @@ -84,7 +84,7 @@ async fn simulation_create( }; while let Some(cluster_mutex) = cluster_mutex.upgrade() { let mut cluster = cluster_mutex.lock().await; - let mut runner = ClusterRunner::new(&mut *cluster, |_| {}); + let mut runner = ClusterRunner::new(&mut cluster, |_| {}); let _ = tokio::time::timeout(Duration::from_millis(500), simulator.run(&mut runner)) .await; @@ -93,5 +93,5 @@ async fn simulation_create( }); let cluster_id = setup_rx.await.unwrap()?; - Ok(SimulationCreateResponse { cluster_id }).map(Json) + Ok(Json(SimulationCreateResponse { cluster_id })) }