diff --git a/node/common/src/service/block_producer/mod.rs b/node/common/src/service/block_producer/mod.rs index bddaf8c950..94c350923f 100644 --- a/node/common/src/service/block_producer/mod.rs +++ b/node/common/src/service/block_producer/mod.rs @@ -3,8 +3,9 @@ mod vrf_evaluator; use ledger::proofs::{ block::BlockParams, gates::get_provers, generate_block_proof, transaction::ProofError, }; -use mina_p2p_messages::v2::{ - MinaBaseProofStableV2, ProverExtendBlockchainInputStableV2, StateHash, +use mina_p2p_messages::{ + binprot::BinProtWrite, + v2::{MinaBaseProofStableV2, ProverExtendBlockchainInputStableV2, StateHash}, }; use node::{ account::AccountSecretKey, @@ -98,8 +99,27 @@ impl node::service::BlockProducerService for crate::NodeService { let tx = self.event_sender().clone(); thread::spawn(move || { - let res = prove(input, keypair, false).map_err(|err| format!("{err:?}")); + let res = prove(input.clone(), keypair, false).map_err(|err| format!("{err:?}")); + if res.is_err() { + // IMPORTANT: Make sure that `input` here is a copy from before `prove` is called, we don't + // want to leak the private key. + if let Err(error) = dump_failed_block_proof_input(block_hash.clone(), input) { + eprintln!("ERROR when dumping failed block proof inputs: {}", error); + } + } let _ = tx.send(BlockProducerEvent::BlockProve(block_hash, res).into()); }); } } + +fn dump_failed_block_proof_input( + block_hash: StateHash, + input: Box, +) -> std::io::Result<()> { + let filename = format!("/tmp/failed_block_proof_input_{block_hash}.binprot"); + println!("Dumping failed block proof to {filename}"); + let mut file = std::fs::File::create(&filename)?; + input.binprot_write(&mut file)?; + file.sync_all()?; + Ok(()) +} diff --git a/node/src/ledger/ledger_manager.rs b/node/src/ledger/ledger_manager.rs index fbab2a4c09..15dfeb381a 100644 --- a/node/src/ledger/ledger_manager.rs +++ b/node/src/ledger/ledger_manager.rs @@ -386,6 +386,21 @@ impl LedgerManager { } } + pub fn get_accounts( + &self, + ledger_hash: &LedgerHash, + account_ids: Vec, + ) -> Result, String> { + // TODO: this should be asynchronous + match self.call_sync(LedgerRequest::AccountsGet { + ledger_hash: ledger_hash.clone(), + account_ids, + }) { + Ok(LedgerResponse::AccountsGet(result)) => result, + _ => panic!("get_accounts failed"), + } + } + #[allow(clippy::type_complexity)] pub fn producers_with_delegates( &self, diff --git a/node/src/transaction_pool/transaction_pool_actions.rs b/node/src/transaction_pool/transaction_pool_actions.rs index f33e852e67..6afc5a7e49 100644 --- a/node/src/transaction_pool/transaction_pool_actions.rs +++ b/node/src/transaction_pool/transaction_pool_actions.rs @@ -5,7 +5,7 @@ use ledger::{ diff::{self, BestTipDiff, DiffVerified}, ValidCommandWithHash, }, - Account, AccountId, BaseLedger as _, + Account, AccountId, }; use mina_p2p_messages::{ list::List, @@ -108,29 +108,30 @@ impl TransactionPoolEffectfulAction { openmina_core::log::system_time(); kind = "Info", summary = "fetching accounts for tx pool"); - let best_tip_mask = match store.service().ledger_manager().get_mask(&ledger_hash) { - Some((mask, _)) => mask, - None => { + // FIXME: the ledger ctx `get_accounts` function doesn't ensure that every account we + // asked for is included in the result. + // TODO: should be asynchronous. Once asynchronous, watch out for race + // conditions between tx pool and transition frontier. By the time the + // accounts have been fetched the best tip may have changed already. + let accounts = match store + .service() + .ledger_manager() + .get_accounts(&ledger_hash, account_ids.iter().cloned().collect()) + { + Ok(accounts) => accounts, + Err(err) => { openmina_core::log::error!( openmina_core::log::system_time(); kind = "Error", summary = "failed to fetch accounts for tx pool", - error = format!("ledger {:?} not found", ledger_hash)); + error = format!("ledger {:?}, error: {:?}", ledger_hash, err)); return; } }; - let accounts = account_ids + let accounts = accounts .into_iter() - .filter_map(|account_id| { - best_tip_mask - .location_of_account(&account_id) - .and_then(|addr| { - best_tip_mask - .get(addr) - .map(|account| (account_id, *account)) - }) - }) + .map(|account| (account.id(), account)) .collect::>(); store.dispatch_callback(on_result, (accounts, pending_id, from_rpc)); diff --git a/p2p/build.rs b/p2p/build.rs index 12c689f312..9ece6fde94 100644 --- a/p2p/build.rs +++ b/p2p/build.rs @@ -8,5 +8,5 @@ fn main() { ], &["src/network/pubsub", "src/network/identify"], ) - .unwrap(); + .expect("Proto build failed"); } diff --git a/p2p/src/identity/peer_id.rs b/p2p/src/identity/peer_id.rs index ee7ea2f5fd..43fc3d1d72 100644 --- a/p2p/src/identity/peer_id.rs +++ b/p2p/src/identity/peer_id.rs @@ -107,7 +107,9 @@ impl FromStr for PeerId { if size != 33 { return Err(bs58::decode::Error::BufferTooSmall); } - Ok(Self::from_bytes(bytes[1..33].try_into().unwrap())) + Ok(Self::from_bytes( + bytes[1..33].try_into().expect("Size checked above"), + )) } } @@ -142,14 +144,6 @@ impl TryFrom for libp2p_identity::PeerId { } } -impl PartialEq for PeerId { - fn eq(&self, other: &libp2p_identity::PeerId) -> bool { - let key = libp2p_identity::PublicKey::try_decode_protobuf(other.as_ref().digest()).unwrap(); - let bytes = key.try_into_ed25519().unwrap().to_bytes(); - self == &PeerId::from_bytes(bytes) - } -} - impl Serialize for PeerId { fn serialize(&self, serializer: S) -> Result where @@ -209,16 +203,16 @@ mod tests { #[test] fn test_peer_id_bs58() { let s = "2bEgBrPTzL8wov2D4Kz34WVLCxR4uCarsBmHYXWKQA5wvBQzd9H"; - let id: PeerId = s.parse().unwrap(); + let id: PeerId = s.parse().expect("Parsing failed"); assert_eq!(s, id.to_string()); } #[test] fn test_libp2p_peer_id_conv() { let s = "12D3KooWEiGVAFC7curXWXiGZyMWnZK9h8BKr88U8D5PKV3dXciv"; - let id: libp2p_identity::PeerId = s.parse().unwrap(); - let conv: PeerId = id.try_into().unwrap(); - let id_conv: libp2p_identity::PeerId = conv.try_into().unwrap(); + let id: libp2p_identity::PeerId = s.parse().expect("Parsing failed"); + let conv: PeerId = id.try_into().expect("Parsing failed"); + let id_conv: libp2p_identity::PeerId = conv.try_into().expect("Parsing failed"); assert_eq!(id_conv, id); } @@ -226,9 +220,9 @@ mod tests { #[ignore = "doesn't work"] fn test_bare_base58btc_pk() { let s = "QmSXffHzFVSEoQCYBS1bPpCn4vgGEpQnCA9NLYuhamPBU3"; - let id: libp2p_identity::PeerId = s.parse().unwrap(); - let conv: PeerId = id.try_into().unwrap(); - let id_conv: libp2p_identity::PeerId = conv.try_into().unwrap(); + let id: libp2p_identity::PeerId = s.parse().expect("Error parsing"); + let conv: PeerId = id.try_into().expect("Error converting"); + let id_conv: libp2p_identity::PeerId = conv.try_into().expect("Error converting"); assert_eq!(id_conv, id); } } diff --git a/p2p/src/identity/public_key.rs b/p2p/src/identity/public_key.rs index 6e21301456..14763b8661 100644 --- a/p2p/src/identity/public_key.rs +++ b/p2p/src/identity/public_key.rs @@ -61,7 +61,7 @@ impl FromStr for PublicKey { bs58::decode::Error::BufferTooSmall.to_string(), )); } - Self::from_bytes(bytes[1..33].try_into().unwrap()) + Self::from_bytes(bytes[1..33].try_into().expect("Size checked above")) .map_err(|err| PublicKeyFromStrError::Ed25519(err.to_string())) } } diff --git a/p2p/src/identity/secret_key.rs b/p2p/src/identity/secret_key.rs index b4f89122cf..21e24924aa 100644 --- a/p2p/src/identity/secret_key.rs +++ b/p2p/src/identity/secret_key.rs @@ -77,7 +77,9 @@ impl FromStr for SecretKey { bs58::decode::Error::BufferTooSmall.to_string(), )); } - Ok(Self::from_bytes(bytes[1..33].try_into().unwrap())) + Ok(Self::from_bytes( + bytes[1..33].try_into().expect("Size checked above"), + )) } } diff --git a/p2p/src/network/kad/p2p_network_kad_internals.rs b/p2p/src/network/kad/p2p_network_kad_internals.rs index 5c48301cee..d5bc43f6d4 100644 --- a/p2p/src/network/kad/p2p_network_kad_internals.rs +++ b/p2p/src/network/kad/p2p_network_kad_internals.rs @@ -643,7 +643,7 @@ mod tests { } fn entry_with_peer_id(peer_id: PeerId) -> P2pNetworkKadEntry { - let key = peer_id.try_into().unwrap(); + let key = peer_id.try_into().expect("Error converting PeerId"); P2pNetworkKadEntry { key, peer_id, @@ -655,11 +655,12 @@ mod tests { #[test] fn test_key_generation() { let random_peer_id = SecretKey::rand().public_key().peer_id(); - let libp2p_peer_id = libp2p_identity::PeerId::try_from(random_peer_id).unwrap(); + let libp2p_peer_id = + libp2p_identity::PeerId::try_from(random_peer_id).expect("Conversion failed"); let cid = CID::from(libp2p_peer_id); - let key0 = P2pNetworkKadKey::try_from(&random_peer_id).unwrap(); - let key1 = P2pNetworkKadKey::try_from(random_peer_id).unwrap(); + let key0 = P2pNetworkKadKey::try_from(&random_peer_id).expect("Conversion failed"); + let key1 = P2pNetworkKadKey::try_from(random_peer_id).expect("Conversion failed"); let key2 = P2pNetworkKadKey::from(cid); assert_eq!(key0, key1); @@ -756,14 +757,17 @@ mod tests { let closest = BTreeSet::from_iter(iter); println!("{}", closest.len()); - let max_closest_dist = closest.iter().max_by_key(|e| entry.dist(e)).unwrap(); + let max_closest_dist = closest + .iter() + .max_by_key(|e| entry.dist(e)) + .expect("Failed to find entry"); let min_non_closest_dist = rt .buckets .iter() .flat_map(|e| e.iter()) .filter(|e| !closest.contains(*e)) .min_by_key(|e| entry.dist(e)) - .unwrap(); + .expect("Failed to find entry"); let max = entry.dist(max_closest_dist); let min = entry.dist(min_non_closest_dist); @@ -791,7 +795,10 @@ mod tests { let find_node = rt.find_node(&entry.key); let closest = BTreeSet::from_iter(find_node); - let max_closest_dist = closest.iter().max_by_key(|e| entry.dist(e)).unwrap(); + let max_closest_dist = closest + .iter() + .max_by_key(|e| entry.dist(e)) + .expect("Error finding entry"); let min_non_closest_dist = rt .buckets .iter() @@ -799,7 +806,7 @@ mod tests { .filter(|e| e.key != entry.key && e.key != rt.this_key) .filter(|e| !closest.contains(*e)) .min_by_key(|e| entry.dist(e)) - .unwrap(); + .expect("Error finding entry"); let max = entry.dist(max_closest_dist); let min = entry.dist(min_non_closest_dist); diff --git a/p2p/src/network/kad/p2p_network_kad_protocol.rs b/p2p/src/network/kad/p2p_network_kad_protocol.rs index c7cc0705bd..a463e4b93a 100644 --- a/p2p/src/network/kad/p2p_network_kad_protocol.rs +++ b/p2p/src/network/kad/p2p_network_kad_protocol.rs @@ -302,9 +302,10 @@ pub mod tests { #[test] fn cid_generation() { let random_peer_id = SecretKey::rand().public_key().peer_id(); - let libp2p_peer_id = libp2p_identity::PeerId::try_from(random_peer_id).unwrap(); + let libp2p_peer_id = + libp2p_identity::PeerId::try_from(random_peer_id).expect("PeerId conversion failed"); - let cid0 = CID::try_from(random_peer_id).unwrap(); + let cid0 = CID::try_from(random_peer_id).expect("Error generating CID"); let cid1 = CID::from(libp2p_peer_id); assert_eq!(cid0, cid1); @@ -319,14 +320,14 @@ pub mod tests { let peer_id = "2bEgBrPTzL8wov2D4Kz34WVLCxR4uCarsBmHYXWKQA5wvBQzd9H" .parse::() - .unwrap(); + .expect("Error parsing peer id"); assert_eq!( from_bytes( &libp2p_identity::PeerId::try_from(peer_id) - .unwrap() + .expect("Error converting to PeerId") .to_bytes() ) - .unwrap(), + .expect("Error generating from bytes"), peer_id ); } @@ -347,8 +348,11 @@ pub mod tests { "/ip4/198.51.100.1/tcp/80", "/dns4/ams-2.bootstrap.libp2p.io/tcp/443", ] { - let multiaddr = multiaddr.parse::().unwrap(); - assert_eq!(from_bytes(&multiaddr.to_vec()).unwrap(), multiaddr); + let multiaddr = multiaddr.parse::().expect("Failed to parse"); + assert_eq!( + from_bytes(&multiaddr.to_vec()).expect("Error converting from bytes"), + multiaddr + ); } } @@ -356,7 +360,7 @@ pub mod tests { fn find_nodes_from_wire() { let input = "2c0804500a1226002408011220bcbfc53faa51a1410b7599c1e4411d5ac45ed5a1ffdc4673c1a6e2b9e9125c4d"; - let bytes = hex::decode(input).unwrap(); + let bytes = hex::decode(input).expect("Error decoding"); let protobuf_message = BytesReader::from_bytes(&bytes) .read_message::(&bytes) .expect("should be able to decode"); @@ -375,9 +379,9 @@ pub mod tests { fn find_nodes_from_wire_len() { let input = "2c0804500a1226002408011220bcbfc53faa51a1410b7599c1e4411d5ac45ed5a1ffdc4673c1a6e2b9e9125c4d"; - let bytes = hex::decode(input).unwrap(); + let bytes = hex::decode(input).expect("Error decoding"); let from_bytes = &mut BytesReader::from_bytes(&bytes); - let len = from_bytes.read_varint32(&bytes).unwrap(); + let len = from_bytes.read_varint32(&bytes).expect("Error reading len"); println!("{} {}", len, from_bytes.len()); let protobuf_message = BytesReader::from_bytes(&bytes) diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_effects.rs b/p2p/src/network/pubsub/p2p_network_pubsub_effects.rs index 2169f65fa2..bf9c60fefc 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_effects.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_effects.rs @@ -222,16 +222,6 @@ impl P2pNetworkPubsubAction { } P2pNetworkPubsubAction::OutgoingMessage { msg, peer_id } => { if !message_is_empty(&msg) { - // println!( - // "(pubsub) {this} -> {peer_id}, {:?}, {:?}, {}", - // msg.subscriptions, - // msg.control, - // msg.publish.len() - // ); - // for ele in &msg.publish { - // let id = super::p2p_network_pubsub_state::compute_message_id(ele); - // println!("{}", std::str::from_utf8(&id).unwrap()); - // } let mut data = vec![]; if prost::Message::encode_length_delimited(&msg, &mut data).is_err() { store.dispatch(P2pNetworkPubsubAction::OutgoingMessageError { diff --git a/p2p/src/p2p_reducer.rs b/p2p/src/p2p_reducer.rs index a4cc766d09..20adf7663a 100644 --- a/p2p/src/p2p_reducer.rs +++ b/p2p/src/p2p_reducer.rs @@ -126,8 +126,6 @@ impl P2pState { crate::identify::P2pIdentifyAction::UpdatePeerInformation { peer_id, info } => { if let Some(peer) = state.peers.get_mut(peer_id) { peer.identify = Some(*info.clone()); - } else { - unreachable!() } } } diff --git a/p2p/src/service_impl/libp2p/behavior.rs b/p2p/src/service_impl/libp2p/behavior.rs deleted file mode 100644 index b87dd8cde3..0000000000 --- a/p2p/src/service_impl/libp2p/behavior.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::collections::BTreeMap; - -use libp2p::{gossipsub, identify, swarm::NetworkBehaviour, PeerId}; -use mina_p2p_messages::rpc_kernel::RpcTag; -use openmina_core::channels::mpsc; - -use crate::P2pEvent; - -use libp2p_rpc_behaviour::{Behaviour as RpcBehaviour, Event as RpcEvent, StreamId}; - -use libp2p::kad::{self, record::store::MemoryStore}; - -#[derive(NetworkBehaviour)] -#[behaviour(to_swarm = "Event")] -pub struct Behaviour> { - pub gossipsub: gossipsub::Behaviour, - pub rpc: RpcBehaviour, - pub identify: identify::Behaviour, - pub kademlia: kad::Behaviour, - #[behaviour(ignore)] - pub chain_id: String, - #[behaviour(ignore)] - pub event_source_sender: mpsc::UnboundedSender, - // TODO(vlad9486): move maps inside `RpcBehaviour` - // map msg_id into (tag, version) - #[behaviour(ignore)] - pub ongoing: BTreeMap<(PeerId, u64), (RpcTag, u32)>, - // map from (peer, msg_id) into (stream_id, tag, version) - // - #[behaviour(ignore)] - pub ongoing_incoming: BTreeMap<(PeerId, u64), (StreamId, String, u32)>, -} - -#[derive(Debug, derive_more::From)] -pub enum Event { - // Identify(IdentifyEvent), - Gossipsub(gossipsub::Event), - Rpc((PeerId, RpcEvent)), - Identify(identify::Event), - Kademlia(kad::Event), -} - diff --git a/p2p/src/service_impl/libp2p/mod.rs b/p2p/src/service_impl/libp2p/mod.rs deleted file mode 100644 index 58aa73d053..0000000000 --- a/p2p/src/service_impl/libp2p/mod.rs +++ /dev/null @@ -1,1145 +0,0 @@ -mod behavior; -pub use behavior::Event as BehaviourEvent; -pub use behavior::*; - -use mina_p2p_messages::rpc::GetSomeInitialPeersV1ForV2; - -use std::collections::{BTreeMap, BTreeSet}; -use std::io; -use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; -use std::time::Duration; - -use mina_p2p_messages::binprot::{self, BinProtRead, BinProtWrite}; -use mina_p2p_messages::v2::NetworkPoolSnarkPoolDiffVersionedStableV2; -use openmina_core::channels::mpsc; -use openmina_core::snark::Snark; - -use libp2p::core::muxing::StreamMuxerBox; -use libp2p::core::transport; -use libp2p::futures::{select, FutureExt, StreamExt}; -use libp2p::gossipsub::{ - Behaviour as Gossipsub, ConfigBuilder as GossipsubConfigBuilder, Event as GossipsubEvent, - IdentTopic, MessageAcceptance, MessageAuthenticity, -}; -use libp2p::identity::Keypair; -use libp2p::kad::record::store::MemoryStore; -use libp2p::kad::Mode; -use libp2p::pnet::{PnetConfig, PreSharedKey}; -use libp2p::swarm::dial_opts::DialOpts; -use libp2p::swarm::SwarmEvent; -use libp2p::{identify, kad}; -use libp2p::{noise, StreamProtocol}; -use libp2p::{Multiaddr, PeerId, Swarm, Transport}; -pub use mina_p2p_messages::gossip::GossipNetMessageV2 as GossipNetMessage; - -use libp2p_rpc_behaviour::{BehaviourBuilder, Event as RpcBehaviourEvent, StreamId}; - -use crate::channels::best_tip::BestTipPropagationChannelMsg; -use crate::channels::rpc::{ - BestTipWithProof, P2pRpcRequest, P2pRpcResponse, RpcChannelMsg, - StagedLedgerAuxAndPendingCoinbases, -}; -use crate::channels::ChannelMsg; -use crate::connection::outgoing::{ - P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts, -}; -use crate::identity::SecretKey; -use crate::{P2pChannelEvent, P2pConnectionEvent, P2pDiscoveryEvent, P2pEvent, P2pListenEvent}; - -use super::TaskSpawner; - -/// Type alias for libp2p transport -pub type P2PTransport = (PeerId, StreamMuxerBox); -/// Type alias for boxed libp2p transport -pub type BoxedP2PTransport = transport::Boxed; - -#[derive(Debug)] -pub enum Cmd { - Dial(PeerId, Vec), - Disconnect(PeerId), - SendMessage(PeerId, ChannelMsg), - SnarkBroadcast(Snark, u32), - RunDiscovery(Vec<(PeerId, Multiaddr)>), - FindNode(PeerId), -} - -pub struct Libp2pService { - cmd_sender: mpsc::UnboundedSender, -} - -async fn determine_own_ip() -> BTreeSet { - use std::net::{Ipv4Addr, Ipv6Addr}; - - let local_addresses = [ - IpAddr::V4(Ipv4Addr::UNSPECIFIED), - IpAddr::V6(Ipv6Addr::UNSPECIFIED), - ]; - let services = [ - "https://ifconfig.co/ip", - "https://bot.whatismyipaddress.com", - "https://api.ipify.org", - ]; - - let clients = local_addresses.into_iter().filter_map(|addr| { - reqwest::ClientBuilder::new() - .local_address(addr) - .timeout(Duration::from_secs(20)) - .build() - .ok() - }); - - let (tx, mut rx) = mpsc::unbounded_channel(); - for client in clients { - for service in services { - let tx = tx.clone(); - let client = client.clone(); - tokio::spawn(async move { - let addr = { - client - .get(service) - .send() - .await - .ok()? - .text() - .await - .ok()? - .trim_end_matches('\n') - .parse::() - .ok()? - }; - tx.send(addr).unwrap_or_default(); - - Some(()) - }); - } - } - drop(tx); - - let mut addresses = BTreeSet::new(); - while let Some(addr) = rx.recv().await { - addresses.insert(addr); - } - - addresses -} - -#[allow(dead_code)] -async fn determine_own_ip_stun(stun_addr: SocketAddr) -> io::Result { - use faster_stun::{attribute, Decoder, Kind, Method, Payload}; - use tokio::net::UdpSocket; - - let socket = UdpSocket::bind(SocketAddr::from(([0; 4], 0))).await?; - tokio::time::timeout(Duration::from_secs(10), async move { - loop { - let mut request = - *b"\x00\x01\x00\x00\x21\x12\xa4\x42\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"; - request[8..].clone_from_slice(&rand::random::<[u8; 12]>()); - socket.send_to(&request, stun_addr).await?; - let mut buf = [0; 0x10000]; - let (_, remote_addr) = socket.recv_from(&mut buf).await?; - let mut decoder = Decoder::new(); - if let Ok(Payload::Message(msg)) = decoder.decode(&buf) { - if msg.method == Method::Binding(Kind::Response) - && remote_addr == stun_addr - && msg.token == &request[8..] - { - if let Some(addr) = msg.get::() { - return Ok(addr.ip()); - } - } - } - } - }).await.map_err(|_| io::Error::from(io::ErrorKind::TimedOut)).and_then(|x| x) -} - -impl Libp2pService { - const GOSSIPSUB_TOPIC: &'static str = "coda/consensus-messages/0.0.1"; - - pub fn mocked() -> (Self, mpsc::UnboundedReceiver) { - let (cmd_sender, rx) = mpsc::unbounded_channel(); - (Self { cmd_sender }, rx) - } - - pub fn run( - libp2p_port: Option, - secret_key: SecretKey, - chain_id: Vec, - event_source_sender: mpsc::UnboundedSender, - spawner: S, - ) -> Self - where - E: 'static + Send + From, - S: TaskSpawner, - { - let topics_iter = IntoIterator::into_iter([ - Self::GOSSIPSUB_TOPIC, - "mina/block/1.0.0", - "mina/tx/1.0.0", - "mina/snark-work/1.0.0", - ]); - - let identity_keys = Keypair::ed25519_from_bytes(secret_key.to_bytes()) - .expect("secret key bytes must be valid"); - - let message_authenticity = MessageAuthenticity::Signed(identity_keys.clone()); - let gossipsub_config = GossipsubConfigBuilder::default() - .max_transmit_size(1024 * 1024 * 32) - .validate_messages() - .build() - .unwrap(); - let mut gossipsub: Gossipsub = - Gossipsub::new(message_authenticity, gossipsub_config).unwrap(); - topics_iter - .map(IdentTopic::new) - .for_each(|topic| assert!(gossipsub.subscribe(&topic).unwrap())); - - let identify = identify::Behaviour::new(identify::Config::new( - "ipfs/0.1.0".to_string(), - identity_keys.public(), - )); - - let peer_id = identity_keys.public().to_peer_id(); - let kad_config = { - let mut c = kad::Config::default(); - c.set_protocol_names(vec![StreamProtocol::new("/coda/kad/1.0.0")]); - c - }; - let kademlia = kad::Behaviour::with_config(peer_id, MemoryStore::new(peer_id), kad_config); - - let behaviour = Behaviour { - gossipsub, - rpc: { - use mina_p2p_messages::rpc::{ - AnswerSyncLedgerQueryV2, GetAncestryV2, GetBestTipV2, - GetStagedLedgerAuxAndPendingCoinbasesAtHashV2, GetTransitionChainProofV1ForV2, - GetTransitionChainV2, - }; - - BehaviourBuilder::default() - .register_method::() - .register_method::() - .register_method::() - .register_method::() - .register_method::() - .register_method::() - .build() - }, - identify, - kademlia, - chain_id: String::from_utf8(chain_id).unwrap(), - event_source_sender, - ongoing: BTreeMap::default(), - ongoing_incoming: BTreeMap::default(), - }; - - let (cmd_sender, mut cmd_receiver) = mpsc::unbounded_channel(); - let psk = PreSharedKey::new(openmina_core::preshared_key(&behaviour.chain_id)); - - let fut = async move { - let mut swarm = libp2p::SwarmBuilder::with_existing_identity(identity_keys) - .with_tokio() - .with_other_transport(|key| { - let noise_config = noise::Config::new(key).unwrap(); - let mut yamux_config = libp2p::yamux::Config::default(); - - yamux_config.set_protocol_name("/coda/yamux/1.0.0"); - - let base_transport = libp2p::tcp::tokio::Transport::new( - libp2p::tcp::Config::default() - .nodelay(true) - .port_reuse(true), - ); - - base_transport - .and_then(move |socket, _| PnetConfig::new(psk).handshake(socket)) - .upgrade(libp2p::core::upgrade::Version::V1) - .authenticate(noise_config) - .multiplex(yamux_config) - .timeout(Duration::from_secs(60)) - })? - .with_dns()? - .with_behaviour(|_| behaviour)? - .build(); - - swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); - - if let Some(port) = libp2p_port { - for ip in determine_own_ip().await { - let mut addr = Multiaddr::from(ip); - addr.push(libp2p::multiaddr::Protocol::Tcp(port)); - swarm.add_external_address(addr); - } - - if let Err(err) = swarm.listen_on(format!("/ip6/::/tcp/{port}").parse().unwrap()) { - openmina_core::log::error!( - openmina_core::log::system_time(); - kind = "Libp2pListenError", - summary = format!("libp2p failed to start listener on ipv6 at port: {port}. error: {err:?}"), - ); - } - if let Err(err) = - swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{port}").parse().unwrap()) - { - openmina_core::log::error!( - openmina_core::log::system_time(); - kind = "Libp2pListenError", - summary = format!("libp2p failed to start listener on ipv4 at port: {port}. error: {err:?}"), - ); - } - } - - loop { - select! { - event = swarm.next() => match event { - Some(event) => Self::handle_event(&mut swarm, event).await, - None => break, - }, - cmd = cmd_receiver.recv().fuse() => match cmd { - Some(cmd) => Self::handle_cmd(&mut swarm, cmd).await, - None => break, - } - } - } - - // FIXME: keeping the compiler happy but we need proper handling - Result::<(), Box>::Ok(()) - }; - - spawner.spawn_main("libp2p", fut); - - Self { cmd_sender } - } - - fn gossipsub_send(swarm: &mut Swarm>, msg: &GossipNetMessage) - where - E: From, - { - let mut encoded = vec![0; 8]; - match msg.binprot_write(&mut encoded) { - Ok(_) => {} - Err(_err) => { - // TODO(binier) - return; - // log::error!("Failed to encode GossipSub Message: {:?}", err); - // panic!("{}", err); - } - } - let msg_len = (encoded.len() as u64 - 8).to_le_bytes(); - encoded[..8].clone_from_slice(&msg_len); - - let topic = IdentTopic::new(Self::GOSSIPSUB_TOPIC); - let _ = swarm.behaviour_mut().gossipsub.publish(topic, encoded); - } - - async fn handle_cmd>(swarm: &mut Swarm>, cmd: Cmd) { - match cmd { - Cmd::Dial(peer_id, addrs) => { - let opts = DialOpts::peer_id(peer_id).addresses(addrs).build(); - if let Err(e) = swarm.dial(opts) { - let peer_id = crate::PeerId::from(peer_id); - openmina_core::log::error!( - openmina_core::log::system_time(); - node_id = crate::PeerId::from(*swarm.local_peer_id()).to_string(), - summary = format!("Cmd::Dial(...)"), - peer_id = peer_id.to_string(), - error = e.to_string() - ); - } - } - Cmd::Disconnect(peer_id) => { - let _ = swarm.disconnect_peer_id(peer_id); - } - Cmd::SendMessage(peer_id, msg) => match msg { - ChannelMsg::SnarkPropagation(_) => { - // unsupported. Instead `Cmd::SnarkBroadcast` will be used. - } - ChannelMsg::SnarkJobCommitmentPropagation(_) => { - // unsupported - } - ChannelMsg::BestTipPropagation(msg) => match msg { - BestTipPropagationChannelMsg::GetNext => { - // TODO(binier): mark that peer can send us - // a message now. For now not important as - // we send this message right after we see a - // message from the peer. - } - BestTipPropagationChannelMsg::BestTip(block) => { - // TODO(binier): for each peer, send message cmd - // will be received, yet we are broadcasting to - // every peer every time. It's kinda fine because - // gossipsub protocol will prevent same message - // from being published, but it's still wasteful. - Self::gossipsub_send( - swarm, - &GossipNetMessage::NewState(block.as_ref().clone()), - ); - // TODO(binier): send event: `P2pChannelEvent::Sent` - } - }, - ChannelMsg::Rpc(msg) => { - Self::handle_cmd_rpc(swarm, peer_id, msg) - .expect("binprot write error must not happen, must send valid msg"); - } - }, - Cmd::SnarkBroadcast(snark, nonce) => { - let message = Box::new((snark.statement(), (&snark).into())); - let message = NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(message); - let nonce = nonce.into(); - Self::gossipsub_send(swarm, &GossipNetMessage::SnarkPoolDiff { message, nonce }); - } - Cmd::RunDiscovery(peers) => { - for (peer_id, addr) in peers { - swarm.behaviour_mut().kademlia.add_address(&peer_id, addr); - } - - match swarm.behaviour_mut().kademlia.bootstrap() { - Ok(_id) => {} - Err(err) => { - let _ = err; - // TODO: log error - } - } - } - Cmd::FindNode(peer_id) => { - let _id = swarm.behaviour_mut().kademlia.get_closest_peers(peer_id); - } - } - } - - fn handle_cmd_rpc>( - swarm: &mut Swarm>, - peer_id: PeerId, - msg: RpcChannelMsg, - ) -> Result<(), binprot::Error> { - use mina_p2p_messages::{ - core::Info, - rpc::{ - AnswerSyncLedgerQueryV2, GetAncestryV2, GetBestTipV2, - GetStagedLedgerAuxAndPendingCoinbasesAtHashV2, GetTransitionChainV2, - ProofCarryingDataStableV1, ProofCarryingDataWithHashV1, - }, - rpc_kernel::{RpcMethod, RpcResult}, - }; - - let b = swarm.behaviour_mut(); - match msg { - RpcChannelMsg::Request(id, req) => { - let stream_id = StreamId::Outgoing(0); - let key = (peer_id, id); - - match req { - P2pRpcRequest::BestTipWithProof => { - type T = GetBestTipV2; - b.ongoing.insert(key, (T::NAME, T::VERSION)); - b.rpc.query::(peer_id, stream_id, id, ())?; - } - P2pRpcRequest::LedgerQuery(hash, query) => { - type T = AnswerSyncLedgerQueryV2; - b.ongoing.insert(key, (T::NAME, T::VERSION)); - let query = (hash.0.clone(), query); - b.rpc.query::(peer_id, stream_id, id, query)?; - } - P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(hash) => { - type T = GetStagedLedgerAuxAndPendingCoinbasesAtHashV2; - b.ongoing.insert(key, (T::NAME, T::VERSION)); - let query = hash.0.clone(); - b.rpc.query::(peer_id, stream_id, id, query)?; - } - P2pRpcRequest::Block(hash) => { - type T = GetTransitionChainV2; - b.ongoing.insert(key, (T::NAME, T::VERSION)); - let query = vec![hash.0.clone()]; - b.rpc.query::(peer_id, stream_id, id, query)?; - } - P2pRpcRequest::Snark(_) => {} - P2pRpcRequest::InitialPeers => { - type T = GetSomeInitialPeersV1ForV2; - b.ongoing.insert(key, (T::NAME, T::VERSION)); - b.rpc.query::(peer_id, stream_id, id, ())?; - } - }; - } - RpcChannelMsg::Response(id, resp) => { - if let Some((stream_id, tag, version)) = b.ongoing_incoming.remove(&(peer_id, id)) { - match resp { - None => match (tag.as_bytes(), version) { - (GetBestTipV2::NAME, GetBestTipV2::VERSION) => { - type T = GetBestTipV2; - b.rpc.respond::(peer_id, stream_id, id, Ok(None))? - } - (GetAncestryV2::NAME, GetAncestryV2::VERSION) => { - type T = GetAncestryV2; - b.rpc.respond::(peer_id, stream_id, id, Ok(None))? - } - (AnswerSyncLedgerQueryV2::NAME, AnswerSyncLedgerQueryV2::VERSION) => { - // TODO: shouldn't we disable this method in menu? - type T = AnswerSyncLedgerQueryV2; - b.rpc.respond::( - peer_id, - stream_id, - id, - Ok(RpcResult(Err(Info::new("not implemented")))), - )? - } - ( - GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME, - GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::VERSION, - ) => { - type T = GetStagedLedgerAuxAndPendingCoinbasesAtHashV2; - b.rpc.respond::(peer_id, stream_id, id, Ok(None))? - } - (GetTransitionChainV2::NAME, GetTransitionChainV2::VERSION) => { - type T = GetTransitionChainV2; - b.rpc.respond::(peer_id, stream_id, id, Ok(None))? - } - ( - GetSomeInitialPeersV1ForV2::NAME, - GetSomeInitialPeersV1ForV2::VERSION, - ) => { - type T = GetSomeInitialPeersV1ForV2; - b.rpc.respond::(peer_id, stream_id, id, Ok(vec![]))? - } - _ => {} - }, - Some(P2pRpcResponse::BestTipWithProof(msg)) => { - if tag.as_bytes() == GetAncestryV2::NAME { - type T = GetAncestryV2; - let v = msg.proof.0.iter().map(|x| x.0.clone()).collect(); - let r = Ok(Some(ProofCarryingDataWithHashV1 { - data: (*msg.best_tip).clone(), - proof: (v, (*msg.proof.1).clone()), - })); - b.rpc.respond::(peer_id, stream_id, id, r)?; - } else { - type T = GetBestTipV2; - let r = Ok(Some(ProofCarryingDataStableV1 { - data: (*msg.best_tip).clone(), - proof: (msg.proof.0, (*msg.proof.1).clone()), - })); - b.rpc.respond::(peer_id, stream_id, id, r)?; - } - } - Some(P2pRpcResponse::LedgerQuery(msg)) => { - type T = AnswerSyncLedgerQueryV2; - let r = Ok(RpcResult(Ok(msg))); - b.rpc.respond::(peer_id, stream_id, id, r)?; - } - Some(P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock(msg)) => { - type T = GetStagedLedgerAuxAndPendingCoinbasesAtHashV2; - let r = ( - msg.scan_state.clone(), - msg.staged_ledger_hash.0.clone(), - msg.pending_coinbase.clone(), - msg.needed_blocks.clone(), - ); - let r = Ok(Some(r)); - b.rpc.respond::(peer_id, stream_id, id, r)?; - } - Some(P2pRpcResponse::Block(msg)) => { - type T = GetTransitionChainV2; - let r = Ok(Some(vec![(*msg).clone()])); - b.rpc.respond::(peer_id, stream_id, id, r)?; - } - Some(P2pRpcResponse::Snark(_)) => {} - Some(P2pRpcResponse::InitialPeers(peers)) => { - type T = GetSomeInitialPeersV1ForV2; - let r = Ok(peers - .iter() - .filter_map(P2pConnectionOutgoingInitOpts::try_into_mina_rpc) - .collect()); - b.rpc.respond::(peer_id, stream_id, id, r)?; - } - } - } - } - } - - Ok(()) - } - - async fn handle_event, Err: std::error::Error>( - swarm: &mut Swarm>, - event: SwarmEvent, - ) { - match event { - SwarmEvent::NewListenAddr { - listener_id, - address, - .. - } => { - let maddr = format!("{address}/p2p/{}", swarm.local_peer_id()); - let listener_id = format!("{listener_id:?}"); - openmina_core::log::info!( - openmina_core::log::system_time(); - kind = "Libp2pListenStart", - summary = format!("libp2p.{listener_id} listening on: {maddr}"), - listener_id = listener_id, - maddr = maddr, - ); - let event = P2pEvent::Listen(P2pListenEvent::NewListenAddr { - listener_id: listener_id.into(), - addr: address, - }); - let _ = swarm.behaviour_mut().event_source_sender.send(event.into()); - } - SwarmEvent::ExpiredListenAddr { - listener_id, - address, - .. - } => { - let maddr = format!("{address}/p2p/{}", swarm.local_peer_id()); - let listener_id = format!("{listener_id:?}"); - openmina_core::log::info!( - openmina_core::log::system_time(); - kind = "Libp2pListenStart", - summary = format!("libp2p.{listener_id} stopped listening on: {maddr}"), - listener_id = listener_id, - maddr = maddr, - ); - let event = P2pEvent::Listen(P2pListenEvent::ExpiredListenAddr { - listener_id: listener_id.into(), - addr: address, - }); - let _ = swarm.behaviour_mut().event_source_sender.send(event.into()); - } - SwarmEvent::ListenerError { listener_id, error } => { - let listener_id = format!("{listener_id:?}"); - openmina_core::log::error!( - openmina_core::log::system_time(); - kind = "Libp2pListenError", - summary = format!("libp2p.{listener_id:?} listener error: {error:?}"), - listener_id = listener_id, - ); - let event = P2pEvent::Listen(P2pListenEvent::ListenerError { - listener_id: listener_id.into(), - error: error.to_string(), - }); - let _ = swarm.behaviour_mut().event_source_sender.send(event.into()); - } - SwarmEvent::ListenerClosed { - listener_id, - reason, - .. - } => { - let listener_id = format!("{listener_id:?}"); - openmina_core::log::warn!( - openmina_core::log::system_time(); - kind = "Libp2pListenError", - summary = format!("libp2p.{listener_id} closed. Reason: {reason:?}"), - listener_id = listener_id, - ); - let event = P2pEvent::Listen(P2pListenEvent::ListenerClosed { - listener_id: listener_id.into(), - error: reason.err().map(|err| err.to_string()), - }); - let _ = swarm.behaviour_mut().event_source_sender.send(event.into()); - } - SwarmEvent::IncomingConnection { - connection_id, - local_addr, - send_back_addr, - } => { - let connection_id = format!("{connection_id:?}"); - openmina_core::log::info!( - openmina_core::log::system_time(); - kind = "libp2p::IncomingConnection", - summary = format!("libp2p incoming {connection_id} {local_addr} {send_back_addr}"), - connection_id = connection_id, - ); - } - SwarmEvent::Dialing { peer_id, .. } => { - let peer_id = peer_id - .map(crate::PeerId::from) - .as_ref() - .map(ToString::to_string); - let peer_id = peer_id.as_ref().map_or("", String::as_str); - openmina_core::log::info!( - openmina_core::log::system_time(); - node_id = crate::PeerId::from(*swarm.local_peer_id()).to_string(), - kind = "libp2p::Dialing", - summary = format!("peer_id: {peer_id}"), - peer_id = peer_id, - ); - } - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - swarm.behaviour_mut().identify.push(Some(peer_id)); - let peer_id: crate::PeerId = peer_id.into(); - openmina_core::log::info!( - openmina_core::log::system_time(); - node_id = crate::PeerId::from(*swarm.local_peer_id()).to_string(), - kind = "libp2p::ConnectionEstablished", - summary = format!("peer_id: {}", peer_id), - peer_id = peer_id.to_string(), - ); - let event = P2pEvent::Connection(P2pConnectionEvent::Finalized(peer_id, Ok(()))); - let _ = swarm.behaviour_mut().event_source_sender.send(event.into()); - } - SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { - let peer_id: crate::PeerId = peer_id.into(); - let event = P2pEvent::Connection(P2pConnectionEvent::Closed(peer_id)); - let _ = swarm.behaviour_mut().event_source_sender.send(event.into()); - - // TODO(binier): move to log effects - openmina_core::log::warn!( - openmina_core::log::system_time(); - kind = "PeerDisconnected", - summary = format!("peer_id: {}", peer_id), - peer_id = peer_id.to_string(), - cause = format!("{:?}", cause) - ); - } - SwarmEvent::OutgoingConnectionError { - connection_id: _, - peer_id, - error, - } => { - let peer_id = match peer_id { - Some(v) => v, - None => return, - }; - if peer_id.as_ref().code() == 0x12 { - // cannot report about the failure, - // because our PeerId cannot represent this peer_id - return; - } - let event = P2pEvent::Connection(P2pConnectionEvent::Finalized( - peer_id.into(), - Err(error.to_string()), - )); - let _ = swarm.behaviour_mut().event_source_sender.send(event.into()); - } - SwarmEvent::Behaviour(event) => match event { - BehaviourEvent::Kademlia(event) => { - match event { - kad::Event::RoutingUpdated { - peer, addresses, .. - } => { - if peer.as_ref().code() != 0x12 { - let event = P2pEvent::Discovery(P2pDiscoveryEvent::AddRoute( - peer.into(), - addresses - .iter() - .filter_map(|a| { - P2pConnectionOutgoingInitLibp2pOpts::try_from(a).ok() - }) - .map(P2pConnectionOutgoingInitOpts::LibP2P) - .collect(), - )); - let _ = - swarm.behaviour_mut().event_source_sender.send(event.into()); - } - } - kad::Event::OutboundQueryProgressed { step, result, .. } => { - let b = swarm.behaviour_mut(); - match result { - kad::QueryResult::Bootstrap(Ok(_v)) => { - use sha2::digest::{FixedOutput, Update}; - - if step.last { - let r = sha2::Sha256::default() - .chain(b"/coda/0.0.1/") - .chain(&b.chain_id) - .finalize_fixed(); - // TODO(vlad9486): use multihash, remove hardcode - let mut key = vec![18, 32]; - key.extend_from_slice(&r); - let key = kad::record::Key::new(&key); - - if let Err(_err) = b.kademlia.start_providing(key) { - // memory storage should not return error - } - // initial bootstrap is done - b.event_source_sender - .send( - P2pEvent::Discovery(P2pDiscoveryEvent::Ready) - .into(), - ) - .unwrap_or_default(); - } - } - kad::QueryResult::GetClosestPeers(Ok(v)) => { - let peers = v.peers.into_iter().filter_map(|peer_id| { - if peer_id.as_ref().code() == 0x12 { - return None; - } - Some(peer_id.into()) - }); - let response = P2pDiscoveryEvent::DidFindPeers(peers.collect()); - b.event_source_sender - .send(P2pEvent::Discovery(response).into()) - .unwrap_or_default() - } - kad::QueryResult::GetClosestPeers(Err(err)) => { - let response = - P2pDiscoveryEvent::DidFindPeersError(err.to_string()); - b.event_source_sender - .send(P2pEvent::Discovery(response).into()) - .unwrap_or_default() - } - _ => {} - } - } - _ => {} - } - } - BehaviourEvent::Gossipsub(GossipsubEvent::Message { - propagation_source, - message_id, - message, - }) => { - // We will manually publish applied blocks. - // TODO(binier): better approach - let _ = swarm - .behaviour_mut() - .gossipsub - .report_message_validation_result( - &message_id, - &propagation_source, - MessageAcceptance::Ignore, - ); - - let bytes = &message.data; - let res = if bytes.len() < 8 { - Err("message too short".to_owned()) - } else { - let len = u64::from_le_bytes(bytes[0..8].try_into().unwrap()); - let data = &bytes[8..]; - assert_eq!(len, data.len() as u64); - GossipNetMessage::binprot_read(&mut &*data) - .map_err(|err| format!("{err:?}")) - }; - let res = match res { - Err(err) => Err(err), - Ok(GossipNetMessage::NewState(block)) => { - Ok(ChannelMsg::BestTipPropagation( - BestTipPropagationChannelMsg::BestTip(block.into()), - )) - } - Ok(GossipNetMessage::SnarkPoolDiff { message, nonce }) => match message { - // TODO(binier): Why empty? Should we error? - NetworkPoolSnarkPoolDiffVersionedStableV2::Empty => return, - NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work) => { - let event = - P2pEvent::Channel(P2pChannelEvent::Libp2pSnarkReceived( - propagation_source.into(), - work.1.into(), - nonce.as_u32(), - )); - let _ = - swarm.behaviour_mut().event_source_sender.send(event.into()); - return; - } - }, - _ => return, - }; - - let event = P2pEvent::Channel(P2pChannelEvent::Received( - propagation_source.into(), - res, - )); - let _ = swarm.behaviour_mut().event_source_sender.send(event.into()); - } - BehaviourEvent::Rpc((peer_id, event)) => { - Self::handle_event_rpc(swarm, peer_id, event); - } - BehaviourEvent::Identify(identify::Event::Received { peer_id, info }) => { - if let Some(maddr) = info.listen_addrs.first() { - swarm - .behaviour_mut() - .kademlia - .add_address(&peer_id, maddr.clone()); - - let mut maddr = maddr.clone(); - maddr.push(libp2p::multiaddr::Protocol::P2p(peer_id)); - let _ = swarm - .behaviour_mut() - .event_source_sender - .send(P2pEvent::Libp2pIdentify(peer_id.into(), maddr).into()); - } - } - _ => { - openmina_core::log::trace!( - openmina_core::log::system_time(); - kind = "IgnoredLibp2pBehaviorEvent", - event = format!("{:?}", event) - ); - } - }, - event => { - openmina_core::log::trace!( - openmina_core::log::system_time(); - kind = "IgnoredLibp2pSwarmEvent", - event = format!("{:?}", event) - ); - } - } - } - - fn handle_event_rpc>( - swarm: &mut Swarm>, - peer_id: PeerId, - event: RpcBehaviourEvent, - ) { - let sender = swarm.behaviour_mut().event_source_sender.clone(); - let send = |event: P2pEvent| { - let _ = sender.send(event.into()); - }; - let send_error = |err: String| { - let msg = P2pEvent::Channel(P2pChannelEvent::Received(peer_id.into(), Err(err))); - let _ = sender.send(msg.into()); - }; - match event { - RpcBehaviourEvent::ConnectionClosed => { - // send(P2pConnectionEvent::Closed(peer_id.into()).into()); - } - RpcBehaviourEvent::ConnectionEstablished => { - // send(P2pConnectionEvent::Finalized(peer_id.into(), Ok(())).into()); - } - RpcBehaviourEvent::Stream { - received, - stream_id, - } => { - use libp2p_rpc_behaviour::Received; - use mina_p2p_messages::{ - rpc::{ - AnswerSyncLedgerQueryV2, GetAncestryV2, GetBestTipV2, - GetStagedLedgerAuxAndPendingCoinbasesAtHashV2, - GetTransitionChainProofV1ForV2, GetTransitionChainV2, - }, - rpc_kernel::{ - Error as RpcError, NeedsLength, QueryHeader, QueryPayload, ResponseHeader, - ResponsePayload, RpcMethod, - }, - v2, - }; - - let ch_send = send; - let send = |msg: RpcChannelMsg| { - ch_send(P2pEvent::Channel(P2pChannelEvent::Received( - peer_id.into(), - Ok(ChannelMsg::Rpc(msg)), - ))) - }; - - fn parse_q(bytes: Vec) -> Result { - let mut bytes = bytes.as_slice(); - as BinProtRead>::binprot_read(&mut bytes) - .map(|NeedsLength(x)| x) - .map_err(|err| format!("request {} {}", M::NAME_STR, err)) - } - - fn parse_r( - bytes: Vec, - ) -> Result, String> { - let mut bytes = bytes.as_slice(); - as BinProtRead>::binprot_read(&mut bytes) - .map(|x| x.0.map(|NeedsLength(x)| x)) - .map_err(|err| format!("response {} {}", M::NAME_STR, err)) - } - - match received { - Received::Menu(_) => {} - Received::HandshakeDone => {} - Received::Query { - header: QueryHeader { tag, version, id }, - bytes, - } => { - let tag = tag.to_string_lossy(); - - swarm - .behaviour_mut() - .ongoing_incoming - .insert((peer_id, id as _), (stream_id, tag.clone(), version)); - - let send = - |request: P2pRpcRequest| send(RpcChannelMsg::Request(id as _, request)); - - match (tag.as_bytes(), version) { - (GetBestTipV2::NAME, GetBestTipV2::VERSION) => { - send(P2pRpcRequest::BestTipWithProof) - } - (GetAncestryV2::NAME, GetAncestryV2::VERSION) => { - match parse_q::(bytes) { - Ok(query) => { - // TODO (vlad9486): Check query - let _ = query.data; // must be equal current best tip consensus state - let _ = query.hash; // must be equal best_tip.data.header.protocol_state.hash() - send(P2pRpcRequest::BestTipWithProof) - } - Err(err) => send_error(err), - }; - } - (AnswerSyncLedgerQueryV2::NAME, AnswerSyncLedgerQueryV2::VERSION) => { - match parse_q::(bytes) { - Ok((hash, query)) => send(P2pRpcRequest::LedgerQuery( - v2::MinaBaseLedgerHash0StableV1(hash).into(), - query, - )), - Err(err) => send_error(err), - }; - } - ( - GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME, - GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::VERSION, - ) => { - type T = GetStagedLedgerAuxAndPendingCoinbasesAtHashV2; - match parse_q::(bytes) { - Ok(hash) => send( - P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock( - v2::DataHashLibStateHashStableV1(hash).into(), - ), - ), - Err(err) => send_error(err), - }; - } - (GetTransitionChainV2::NAME, GetTransitionChainV2::VERSION) => { - match parse_q::(bytes) { - Ok(hashes) => { - for hash in hashes { - send(P2pRpcRequest::Block( - v2::DataHashLibStateHashStableV1(hash).into(), - )); - } - } - Err(err) => send_error(err), - } - } - ( - GetTransitionChainProofV1ForV2::NAME, - GetTransitionChainProofV1ForV2::VERSION, - ) => swarm - .behaviour_mut() - .rpc - .respond::( - peer_id, - stream_id, - id, - Ok(None), - ) - .unwrap(), - ( - GetSomeInitialPeersV1ForV2::NAME, - GetSomeInitialPeersV1ForV2::VERSION, - ) => match parse_q::(bytes) { - Ok(()) => { - send(P2pRpcRequest::InitialPeers); - } - Err(err) => send_error(err), - }, - _ => (), - }; - } - Received::Response { - header: ResponseHeader { id }, - bytes, - } => { - let send = |response: Option| { - send(RpcChannelMsg::Response(id as _, response)) - }; - - let Some((tag, version)) = - swarm.behaviour_mut().ongoing.remove(&(peer_id, (id as _))) - else { - return; - }; - - match (tag, version) { - (GetBestTipV2::NAME, GetBestTipV2::VERSION) => { - match parse_r::(bytes) { - Ok(response) => { - let response = response - .ok() - .flatten() - .map(|resp| BestTipWithProof { - best_tip: resp.data.into(), - proof: (resp.proof.0, resp.proof.1.into()), - }) - .map(P2pRpcResponse::BestTipWithProof); - send(response) - } - Err(err) => send_error(err), - } - } - (AnswerSyncLedgerQueryV2::NAME, AnswerSyncLedgerQueryV2::VERSION) => { - match parse_r::(bytes) { - Ok(response) => { - let response = response - .ok() - .and_then(|x| x.0.ok()) - .map(P2pRpcResponse::LedgerQuery); - send(response) - } - Err(err) => send_error(err), - } - } - ( - GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME, - GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::VERSION, - ) => { - type T = GetStagedLedgerAuxAndPendingCoinbasesAtHashV2; - match parse_r::(bytes) { - Ok(response) => { - let response = response - .ok() - .flatten() - .map(|(scan_state, hash, pending_coinbase, needed_blocks)| { - let staged_ledger_hash = - v2::MinaBaseLedgerHash0StableV1(hash).into(); - Arc::new(StagedLedgerAuxAndPendingCoinbases { - scan_state, - staged_ledger_hash, - pending_coinbase, - needed_blocks, - }) - }) - .map(P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock); - send(response) - } - Err(err) => send_error(err), - } - } - (GetTransitionChainV2::NAME, GetTransitionChainV2::VERSION) => { - match parse_r::(bytes) { - Ok(response) => { - let response = response.ok().flatten().unwrap_or_default(); - if response.is_empty() { - send(None) - } else { - for block in response { - send(Some(P2pRpcResponse::Block(Arc::new(block)))); - } - } - } - Err(err) => send_error(err), - } - } - ( - GetSomeInitialPeersV1ForV2::NAME, - GetSomeInitialPeersV1ForV2::VERSION, - ) => { - match parse_r::(bytes) { - Ok(response) => { - let response = response.ok().unwrap_or_default(); - if response.is_empty() { - send(None) - } else { - let peers = response.into_iter().filter_map(P2pConnectionOutgoingInitOpts::try_from_mina_rpc).collect(); - send(Some(P2pRpcResponse::InitialPeers(peers))); - } - } - Err(err) => send_error(err), - } - } - _ => send(None), - } - } - } - } - } - } - - pub fn cmd_sender(&mut self) -> &mut mpsc::UnboundedSender { - &mut self.cmd_sender - } -} diff --git a/p2p/src/service_impl/webrtc/mod.rs b/p2p/src/service_impl/webrtc/mod.rs index ceaf69c756..bcfb0ccca0 100644 --- a/p2p/src/service_impl/webrtc/mod.rs +++ b/p2p/src/service_impl/webrtc/mod.rs @@ -549,7 +549,9 @@ async fn peer_loop( if msg.len() < 4 { return Err("WebRTCMessageTooSmall".to_owned()); } else { - *len = u32::from_be_bytes(msg[..4].try_into().unwrap()); + *len = u32::from_be_bytes( + msg[..4].try_into().expect("Size checked above"), + ); *msg = &msg[4..]; let len = *len as usize; if len > chan_id.max_msg_size() { diff --git a/p2p/testing/src/libp2p_node.rs b/p2p/testing/src/libp2p_node.rs index 6903df463b..c6c7e19e14 100644 --- a/p2p/testing/src/libp2p_node.rs +++ b/p2p/testing/src/libp2p_node.rs @@ -46,7 +46,9 @@ impl Libp2pNode { impl TestNode for Libp2pNode { fn peer_id(&self) -> PeerId { - (*self.swarm.local_peer_id()).try_into().unwrap() + (*self.swarm.local_peer_id()) + .try_into() + .expect("Conversion failed") } fn libp2p_port(&self) -> u16 { @@ -126,9 +128,10 @@ pub(crate) fn create_swarm( .max_transmit_size(1024 * 1024 * 32) .validate_messages() .build() - .unwrap(); + .expect("Error building gossipsub"); let mut gossipsub: gossipsub::Behaviour = - gossipsub::Behaviour::new(message_authenticity, gossipsub_config).unwrap(); + gossipsub::Behaviour::new(message_authenticity, gossipsub_config) + .expect("Error creating behaviour"); gossipsub .subscribe(&gossipsub::IdentTopic::new("coda/consensus-messages/0.0.1")) @@ -168,7 +171,7 @@ pub(crate) fn create_swarm( let swarm = libp2p::SwarmBuilder::with_existing_identity(identity_keys) .with_tokio() .with_other_transport(|key| { - let noise_config = libp2p::noise::Config::new(key).unwrap(); + let noise_config = libp2p::noise::Config::new(key).expect("Error generating noise"); let mut yamux_config = libp2p::yamux::Config::default(); yamux_config.set_protocol_name("/coda/yamux/1.0.0"); diff --git a/p2p/testing/src/predicates.rs b/p2p/testing/src/predicates.rs index 65c9317542..3762c13f7a 100644 --- a/p2p/testing/src/predicates.rs +++ b/p2p/testing/src/predicates.rs @@ -122,7 +122,7 @@ where id, event: Libp2pEvent::ConnectionEstablished { peer_id, .. }, } => { - nodes_peers.remove(&(id.into(), peer_id.try_into().unwrap())) + nodes_peers.remove(&(id.into(), peer_id.try_into().expect("Conversion failed"))) && nodes_peers.is_empty() } _ => false, diff --git a/p2p/testing/src/service.rs b/p2p/testing/src/service.rs index 24acc27a6d..5f8c040396 100644 --- a/p2p/testing/src/service.rs +++ b/p2p/testing/src/service.rs @@ -83,7 +83,9 @@ impl P2pServiceWebrtc for ClusterService { &mut self, list: &[p2p::connection::outgoing::P2pConnectionOutgoingInitOpts], ) -> p2p::connection::outgoing::P2pConnectionOutgoingInitOpts { - list.choose(&mut self.rng).unwrap().clone() + list.choose(&mut self.rng) + .expect("Error choosing random peer") + .clone() } fn event_sender(&self) -> &mpsc::UnboundedSender { diff --git a/p2p/testing/src/test_node.rs b/p2p/testing/src/test_node.rs index 53d13d4e79..ad1ed61c18 100644 --- a/p2p/testing/src/test_node.rs +++ b/p2p/testing/src/test_node.rs @@ -21,7 +21,7 @@ pub trait TestNode { } fn libp2p_dial_opts(&self, host: IpAddr) -> Multiaddr { - let peer_id: libp2p::PeerId = self.peer_id().try_into().unwrap(); + let peer_id: libp2p::PeerId = self.peer_id().try_into().expect("Conversion failed"); match host { IpAddr::V4(ip) => { diff --git a/p2p/testing/src/utils.rs b/p2p/testing/src/utils.rs index 9e99ccb6a0..6a1452337e 100644 --- a/p2p/testing/src/utils.rs +++ b/p2p/testing/src/utils.rs @@ -288,8 +288,10 @@ mod tests { .collect::>(); let [_node1, _node2] = - super::rust_nodes_from_config(&mut cluster, RustNodeConfig::default()).unwrap(); - let [node1, node2, node3] = super::rust_nodes_from_default_config(&mut cluster).unwrap(); + super::rust_nodes_from_config(&mut cluster, RustNodeConfig::default()) + .expect("Error creating nodes"); + let [node1, node2, node3] = + super::rust_nodes_from_default_config(&mut cluster).expect("Error creating nodes"); let ready = wait_for_nodes_to_listen(&mut cluster, [node1, node2, node3], Duration::from_secs(10)) diff --git a/p2p/tests/identify.rs b/p2p/tests/identify.rs index 1f19e47b5b..e81e6a6945 100644 --- a/p2p/tests/identify.rs +++ b/p2p/tests/identify.rs @@ -82,7 +82,7 @@ async fn rust_node_to_rust_node() -> anyhow::Result<()> { .connect( node, addr.clone() - .with_p2p(peer_id.try_into().unwrap()) + .with_p2p(peer_id.try_into().expect("Error converting PeerId")) .expect("no error"), ) .expect("no error"); @@ -139,7 +139,11 @@ async fn test_bad_node() -> anyhow::Result<()> { .routing_table; let bad_peer_entry = routing_table - .look_up(&bad_node_peer_id.try_into().unwrap()) + .look_up( + &bad_node_peer_id + .try_into() + .expect("PeerId conversion failed"), + ) .expect("Node not found"); let bad_peer_addresses = bad_peer_entry diff --git a/p2p/tests/kademlia.rs b/p2p/tests/kademlia.rs index 8ad8d6e3c9..65264d2a1d 100644 --- a/p2p/tests/kademlia.rs +++ b/p2p/tests/kademlia.rs @@ -304,7 +304,8 @@ async fn discovery_seed_multiple_peers() -> anyhow::Result<()> { try_wait_for_nodes_to_connect(&mut cluster, peer_ids.map(|peer_id| (node, peer_id)), dur) .await?, "all peers should be connected\n{}", - serde_json::to_string_pretty(&cluster.rust_node(node).state()).unwrap() + serde_json::to_string_pretty(&cluster.rust_node(node).state()) + .expect("Error serializing state") ); Ok(())