diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 0b4d57e9cb..d9580111d6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -284,7 +284,7 @@ jobs: needs: - k8s-peers - build-tests - # - build-tests-webrtc + - build-tests-webrtc runs-on: ubuntu-20.04 container: image: minaprotocol/mina-daemon:3.0.0-dc6bf78-focal-devnet @@ -309,6 +309,7 @@ jobs: - connection_discovery_rust_as_seed - connection_discovery_rust_to_ocaml_via_seed - connection_discovery_rust_to_ocaml + - webrtc_p2p_signaling # - webrtc_single_node # - webrtc_multi_node fail-fast: false @@ -331,6 +332,12 @@ jobs: with: pattern: tests* merge-multiple: true + + - name: Download tests + uses: actions/download-artifact@v4 + with: + pattern: tests-webrtc* + merge-multiple: true - name: Setup permissions run: | diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 0a0a0ef62a..39bda33630 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -17,7 +17,7 @@ jobs: sudo apt install -y protobuf-compiler - uses: actions-rs/toolchain@v1 with: - toolchain: 1.79 + toolchain: 1.81 components: rustfmt, clippy default: true - uses: actions-rs/cargo@v1 @@ -36,4 +36,4 @@ jobs: name: clippy with: token: ${{ secrets.GITHUB_TOKEN }} - args: --all-targets -- -D warnings + args: --all-targets -- -D warnings --allow clippy::mutable_key_type --allow clippy::result_unit_err diff --git a/Cargo.lock b/Cargo.lock index df08f427a7..f4336323b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4752,6 +4752,7 @@ dependencies = [ name = "p2p" version = "0.10.0" dependencies = [ + "aes-gcm 0.10.3", "anyhow", "binprot", "binprot_derive", @@ -4808,6 +4809,7 @@ dependencies = [ "wasm-bindgen-futures", "web-sys", "webrtc", + "x25519-dalek", "zeroize", ] @@ -8279,9 +8281,9 @@ dependencies = [ [[package]] name = "x25519-dalek" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb66477291e7e8d2b0ff1bcb900bf29489a9692816d79874bea351e7a8b6de96" +checksum = "c7e468321c81fb07fa7f4c636c3972b9100f0346e5b6a9f2bd0603a52f7ed277" dependencies = [ "curve25519-dalek", "rand_core", diff --git a/mina-p2p-messages/benches/gossip-binprot.rs b/mina-p2p-messages/benches/gossip-binprot.rs index b757ce9206..d0e9eec8f8 100644 --- a/mina-p2p-messages/benches/gossip-binprot.rs +++ b/mina-p2p-messages/benches/gossip-binprot.rs @@ -1,3 +1,4 @@ +#![allow(unexpected_cfgs)] #![cfg(benchmarks)] #![feature(test)] diff --git a/mina-p2p-messages/tests/rpc-read.rs b/mina-p2p-messages/tests/rpc-read.rs index 31be00fc0f..64e3939165 100644 --- a/mina-p2p-messages/tests/rpc-read.rs +++ b/mina-p2p-messages/tests/rpc-read.rs @@ -158,7 +158,7 @@ fn debugger_to_wire() { "v1/rpc/get-transition-knowledge", "v1/rpc/get-ancestry", ] { - for_all_with_path(&PathBuf::from(d).join("response"), |encoded, path| { + for_all_with_path(PathBuf::from(d).join("response"), |encoded, path| { let mut p = &encoded[1..]; let tag = BinprotTag::binprot_read(&mut p).unwrap().to_string_lossy(); let ver = Ver::binprot_read(&mut p).unwrap(); diff --git a/node/common/src/service/p2p.rs b/node/common/src/service/p2p.rs index 10f1efffdd..d0093ccc0b 100644 --- a/node/common/src/service/p2p.rs +++ b/node/common/src/service/p2p.rs @@ -3,7 +3,11 @@ use std::collections::BTreeMap; use node::{ core::channels::mpsc, event_source::Event, - p2p::{connection::outgoing::P2pConnectionOutgoingInitOpts, PeerId}, + p2p::{ + connection::outgoing::P2pConnectionOutgoingInitOpts, + identity::{EncryptableType, PublicKey}, + PeerId, + }, }; use rand::prelude::*; #[cfg(feature = "p2p-libp2p")] @@ -34,6 +38,23 @@ impl webrtc::P2pServiceWebrtc for NodeService { fn peers(&mut self) -> &mut BTreeMap { &mut self.p2p.webrtc.peers } + + fn encrypt( + &mut self, + other_pk: &PublicKey, + message: &T, + ) -> Result { + let rng = &mut self.rng; + self.p2p.sec_key.encrypt(other_pk, rng, message) + } + + fn decrypt( + &mut self, + other_pk: &PublicKey, + encrypted: &T::Encrypted, + ) -> Result { + self.p2p.sec_key.decrypt(other_pk, encrypted) + } } impl webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p for NodeService { diff --git a/node/common/src/service/rpc/state.rs b/node/common/src/service/rpc/state.rs index 5fec6f009e..16e4949976 100644 --- a/node/common/src/service/rpc/state.rs +++ b/node/common/src/service/rpc/state.rs @@ -23,6 +23,16 @@ impl State { #[cfg(target_family = "wasm")] #[cfg_attr(target_family = "wasm", wasm_bindgen)] impl State { + pub async fn get(&self, filter: String) -> JsValue { + let res = self + .sender + .oneshot_request::(RpcRequest::StateGet(Some(filter))) + .await + .and_then(|v| v.ok()); + res.map(|res| JsValue::from_serde(&res).unwrap_or_default()) + .unwrap_or_default() + } + pub async fn peers(&self) -> JsValue { let res = self .sender diff --git a/node/native/src/http_server.rs b/node/native/src/http_server.rs index e195a169ba..d00660147f 100644 --- a/node/native/src/http_server.rs +++ b/node/native/src/http_server.rs @@ -65,6 +65,9 @@ pub async fn run(port: u16, rpc_sender: RpcSender) { false => StatusCode::OK, true => StatusCode::BAD_REQUEST, }, + P2pConnectionResponse::SignalDecryptionFailed => { + StatusCode::BAD_REQUEST + } P2pConnectionResponse::InternalError => { StatusCode::INTERNAL_SERVER_ERROR } diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index c16f11856f..f50049e58a 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -28,6 +28,10 @@ use crate::p2p::channels::best_tip::P2pChannelsBestTipAction; use crate::p2p::channels::best_tip_effectful::P2pChannelsBestTipEffectfulAction; use crate::p2p::channels::rpc::P2pChannelsRpcAction; use crate::p2p::channels::rpc_effectful::P2pChannelsRpcEffectfulAction; +use crate::p2p::channels::signaling::discovery::P2pChannelsSignalingDiscoveryAction; +use crate::p2p::channels::signaling::discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction; +use crate::p2p::channels::signaling::exchange::P2pChannelsSignalingExchangeAction; +use crate::p2p::channels::signaling::exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction; use crate::p2p::channels::snark::P2pChannelsSnarkAction; use crate::p2p::channels::snark_effectful::P2pChannelsSnarkEffectfulAction; use crate::p2p::channels::snark_job_commitment::P2pChannelsSnarkJobCommitmentAction; @@ -209,6 +213,41 @@ pub enum ActionKind { P2pChannelsRpcEffectfulInit, P2pChannelsRpcEffectfulRequestSend, P2pChannelsRpcEffectfulResponseSend, + P2pChannelsSignalingDiscoveryAnswerDecrypted, + P2pChannelsSignalingDiscoveryAnswerReceived, + P2pChannelsSignalingDiscoveryAnswerSend, + P2pChannelsSignalingDiscoveryDiscoveredAccept, + P2pChannelsSignalingDiscoveryDiscoveredAcceptReceived, + P2pChannelsSignalingDiscoveryDiscoveredReceived, + P2pChannelsSignalingDiscoveryDiscoveredReject, + P2pChannelsSignalingDiscoveryDiscoveredRejectReceived, + P2pChannelsSignalingDiscoveryDiscoveredSend, + P2pChannelsSignalingDiscoveryDiscoveryRequestReceived, + P2pChannelsSignalingDiscoveryDiscoveryRequestSend, + P2pChannelsSignalingDiscoveryInit, + P2pChannelsSignalingDiscoveryPending, + P2pChannelsSignalingDiscoveryReady, + P2pChannelsSignalingDiscoveryRequestReceived, + P2pChannelsSignalingDiscoveryRequestSend, + P2pChannelsSignalingDiscoveryEffectfulAnswerDecrypt, + P2pChannelsSignalingDiscoveryEffectfulInit, + P2pChannelsSignalingDiscoveryEffectfulMessageSend, + P2pChannelsSignalingDiscoveryEffectfulOfferEncryptAndSend, + P2pChannelsSignalingExchangeAnswerReceived, + P2pChannelsSignalingExchangeAnswerSend, + P2pChannelsSignalingExchangeInit, + P2pChannelsSignalingExchangeOfferDecryptError, + P2pChannelsSignalingExchangeOfferDecryptSuccess, + P2pChannelsSignalingExchangeOfferReceived, + P2pChannelsSignalingExchangeOfferSend, + P2pChannelsSignalingExchangePending, + P2pChannelsSignalingExchangeReady, + P2pChannelsSignalingExchangeRequestReceived, + P2pChannelsSignalingExchangeRequestSend, + P2pChannelsSignalingExchangeEffectfulAnswerEncryptAndSend, + P2pChannelsSignalingExchangeEffectfulInit, + P2pChannelsSignalingExchangeEffectfulMessageSend, + P2pChannelsSignalingExchangeEffectfulOfferDecrypt, P2pChannelsSnarkInit, P2pChannelsSnarkLibp2pBroadcast, P2pChannelsSnarkLibp2pReceived, @@ -618,7 +657,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 510; + pub const COUNT: u16 = 545; } impl std::fmt::Display for ActionKind { @@ -1029,6 +1068,8 @@ impl ActionKindGet for P2pChannelsAction { fn kind(&self) -> ActionKind { match self { Self::MessageReceived(a) => a.kind(), + Self::SignalingDiscovery(a) => a.kind(), + Self::SignalingExchange(a) => a.kind(), Self::BestTip(a) => a.kind(), Self::Transaction(a) => a.kind(), Self::Snark(a) => a.kind(), @@ -1069,6 +1110,8 @@ impl ActionKindGet for P2pNetworkAction { impl ActionKindGet for P2pChannelsEffectfulAction { fn kind(&self) -> ActionKind { match self { + Self::SignalingDiscovery(a) => a.kind(), + Self::SignalingExchange(a) => a.kind(), Self::BestTip(a) => a.kind(), Self::Rpc(a) => a.kind(), Self::Snark(a) => a.kind(), @@ -1400,6 +1443,69 @@ impl ActionKindGet for P2pChannelsMessageReceivedAction { } } +impl ActionKindGet for P2pChannelsSignalingDiscoveryAction { + fn kind(&self) -> ActionKind { + match self { + Self::Init { .. } => ActionKind::P2pChannelsSignalingDiscoveryInit, + Self::Pending { .. } => ActionKind::P2pChannelsSignalingDiscoveryPending, + Self::Ready { .. } => ActionKind::P2pChannelsSignalingDiscoveryReady, + Self::RequestSend { .. } => ActionKind::P2pChannelsSignalingDiscoveryRequestSend, + Self::DiscoveryRequestReceived { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryDiscoveryRequestReceived + } + Self::DiscoveredSend { .. } => ActionKind::P2pChannelsSignalingDiscoveryDiscoveredSend, + Self::DiscoveredRejectReceived { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryDiscoveredRejectReceived + } + Self::DiscoveredAcceptReceived { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryDiscoveredAcceptReceived + } + Self::AnswerSend { .. } => ActionKind::P2pChannelsSignalingDiscoveryAnswerSend, + Self::RequestReceived { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryRequestReceived + } + Self::DiscoveryRequestSend { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryDiscoveryRequestSend + } + Self::DiscoveredReceived { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryDiscoveredReceived + } + Self::DiscoveredReject { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryDiscoveredReject + } + Self::DiscoveredAccept { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryDiscoveredAccept + } + Self::AnswerReceived { .. } => ActionKind::P2pChannelsSignalingDiscoveryAnswerReceived, + Self::AnswerDecrypted { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryAnswerDecrypted + } + } + } +} + +impl ActionKindGet for P2pChannelsSignalingExchangeAction { + fn kind(&self) -> ActionKind { + match self { + Self::Init { .. } => ActionKind::P2pChannelsSignalingExchangeInit, + Self::Pending { .. } => ActionKind::P2pChannelsSignalingExchangePending, + Self::Ready { .. } => ActionKind::P2pChannelsSignalingExchangeReady, + Self::RequestSend { .. } => ActionKind::P2pChannelsSignalingExchangeRequestSend, + Self::OfferReceived { .. } => ActionKind::P2pChannelsSignalingExchangeOfferReceived, + Self::OfferDecryptError { .. } => { + ActionKind::P2pChannelsSignalingExchangeOfferDecryptError + } + Self::OfferDecryptSuccess { .. } => { + ActionKind::P2pChannelsSignalingExchangeOfferDecryptSuccess + } + Self::AnswerSend { .. } => ActionKind::P2pChannelsSignalingExchangeAnswerSend, + Self::RequestReceived { .. } => ActionKind::P2pChannelsSignalingExchangeRequestReceived, + Self::OfferSend { .. } => ActionKind::P2pChannelsSignalingExchangeOfferSend, + Self::AnswerReceived { .. } => ActionKind::P2pChannelsSignalingExchangeAnswerReceived, + } + } +} + impl ActionKindGet for P2pChannelsBestTipAction { fn kind(&self) -> ActionKind { match self { @@ -1652,6 +1758,40 @@ impl ActionKindGet for P2pNetworkRpcAction { } } +impl ActionKindGet for P2pChannelsSignalingDiscoveryEffectfulAction { + fn kind(&self) -> ActionKind { + match self { + Self::Init { .. } => ActionKind::P2pChannelsSignalingDiscoveryEffectfulInit, + Self::MessageSend { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryEffectfulMessageSend + } + Self::OfferEncryptAndSend { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryEffectfulOfferEncryptAndSend + } + Self::AnswerDecrypt { .. } => { + ActionKind::P2pChannelsSignalingDiscoveryEffectfulAnswerDecrypt + } + } + } +} + +impl ActionKindGet for P2pChannelsSignalingExchangeEffectfulAction { + fn kind(&self) -> ActionKind { + match self { + Self::Init { .. } => ActionKind::P2pChannelsSignalingExchangeEffectfulInit, + Self::MessageSend { .. } => { + ActionKind::P2pChannelsSignalingExchangeEffectfulMessageSend + } + Self::OfferDecrypt { .. } => { + ActionKind::P2pChannelsSignalingExchangeEffectfulOfferDecrypt + } + Self::AnswerEncryptAndSend { .. } => { + ActionKind::P2pChannelsSignalingExchangeEffectfulAnswerEncryptAndSend + } + } + } +} + impl ActionKindGet for P2pChannelsBestTipEffectfulAction { fn kind(&self) -> ActionKind { match self { diff --git a/node/src/event_source/event_source_effects.rs b/node/src/event_source/event_source_effects.rs index 7b6d90d9ae..8f3c544e1b 100644 --- a/node/src/event_source/event_source_effects.rs +++ b/node/src/event_source/event_source_effects.rs @@ -1,3 +1,5 @@ +use p2p::channels::signaling::discovery::P2pChannelsSignalingDiscoveryAction; +use p2p::channels::signaling::exchange::P2pChannelsSignalingExchangeAction; use p2p::channels::snark::P2pChannelsSnarkAction; use p2p::channels::streaming_rpc::P2pChannelsStreamingRpcAction; use p2p::channels::transaction::P2pChannelsTransactionAction; @@ -158,6 +160,12 @@ pub fn event_source_effects(store: &mut Store, action: EventSourc error: P2pConnectionErrorResponse::Rejected(reason), }); } + P2pConnectionResponse::SignalDecryptionFailed => { + store.dispatch(P2pConnectionOutgoingAction::AnswerRecvError { + peer_id, + error: P2pConnectionErrorResponse::SignalDecryptionFailed, + }); + } P2pConnectionResponse::InternalError => { store.dispatch(P2pConnectionOutgoingAction::AnswerRecvError { peer_id, @@ -194,8 +202,17 @@ pub fn event_source_effects(store: &mut Store, action: EventSourc openmina_core::log::warn!(meta.time(); kind = "P2pChannelEvent::Opened", peer_id = peer_id.to_string(), error = err); // TODO(binier): dispatch error action. } - // TODO(binier): maybe dispatch success and then ready. Ok(_) => match chan_id { + ChannelId::SignalingDiscovery => { + store.dispatch(P2pChannelsSignalingDiscoveryAction::Ready { + peer_id, + }); + } + ChannelId::SignalingExchange => { + store.dispatch(P2pChannelsSignalingExchangeAction::Ready { + peer_id, + }); + } ChannelId::BestTipPropagation => { store.dispatch(P2pChannelsBestTipAction::Ready { peer_id }); } diff --git a/node/src/logger/logger_effects.rs b/node/src/logger/logger_effects.rs index 061d269f9e..7f7a7d939a 100644 --- a/node/src/logger/logger_effects.rs +++ b/node/src/logger/logger_effects.rs @@ -69,6 +69,8 @@ pub fn logger_effects(store: &Store, action: ActionWithMetaRef<'_ P2pAction::Identify(action) => action.action_event(&context), P2pAction::Channels(action) => match action { P2pChannelsAction::MessageReceived(action) => action.action_event(&context), + P2pChannelsAction::SignalingDiscovery(action) => action.action_event(&context), + P2pChannelsAction::SignalingExchange(action) => action.action_event(&context), P2pChannelsAction::BestTip(action) => action.action_event(&context), P2pChannelsAction::Transaction(action) => action.action_event(&context), P2pChannelsAction::Snark(action) => action.action_event(&context), @@ -105,6 +107,12 @@ pub fn logger_effects(store: &Store, action: ActionWithMetaRef<'_ }, Action::P2pEffectful(action) => match action { p2p::P2pEffectfulAction::Channels(action) => match action { + P2pChannelsEffectfulAction::SignalingDiscovery(action) => { + action.action_event(&context) + } + P2pChannelsEffectfulAction::SignalingExchange(action) => { + action.action_event(&context) + } P2pChannelsEffectfulAction::BestTip(action) => action.action_event(&context), P2pChannelsEffectfulAction::Rpc(action) => action.action_event(&context), P2pChannelsEffectfulAction::StreamingRpc(action) => action.action_event(&context), diff --git a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs index fcabd0d449..2b24054fba 100644 --- a/node/src/p2p/callbacks/p2p_callbacks_reducer.rs +++ b/node/src/p2p/callbacks/p2p_callbacks_reducer.rs @@ -45,12 +45,16 @@ impl crate::State { P2pCallbacksAction::P2pChannelsRpcReady { peer_id } => { let peer_id = *peer_id; - dispatcher.push(P2pChannelsRpcAction::RequestSend { - peer_id, - id: 0, - request: Box::new(P2pRpcRequest::BestTipWithProof), - on_init: None, - }); + if state.p2p.get_peer(&peer_id).map_or(false, |p| p.is_libp2p) { + // for webrtc peers, we don't need to send this rpc, as we + // will receive current best tip in best tip channel anyways. + dispatcher.push(P2pChannelsRpcAction::RequestSend { + peer_id, + id: 0, + request: Box::new(P2pRpcRequest::BestTipWithProof), + on_init: None, + }); + } dispatcher.push(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery); dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit); @@ -283,7 +287,9 @@ impl crate::State { let response = None.or_else(|| { let best_tip = best_chain.last()?; let mut chain_iter = best_chain.iter(); - let root_block = chain_iter.next()?; + let root_block = chain_iter.next(); + // when our best tip is genesis block. + let root_block = root_block.unwrap_or(best_tip); // TODO(binier): cache body hashes let Ok(body_hashes) = chain_iter .map(|b| b.header().protocol_state.body.try_hash()) diff --git a/node/src/p2p/channels/mod.rs b/node/src/p2p/channels/mod.rs index 471c7e6640..74cb3c3e70 100644 --- a/node/src/p2p/channels/mod.rs +++ b/node/src/p2p/channels/mod.rs @@ -2,6 +2,7 @@ pub use ::p2p::channels::*; pub mod best_tip; pub mod rpc; +pub mod signaling; pub mod snark; pub mod snark_job_commitment; pub mod streaming_rpc; diff --git a/node/src/p2p/channels/signaling/mod.rs b/node/src/p2p/channels/signaling/mod.rs new file mode 100644 index 0000000000..77951eb36c --- /dev/null +++ b/node/src/p2p/channels/signaling/mod.rs @@ -0,0 +1,4 @@ +pub use ::p2p::channels::signaling::*; + +mod p2p_channels_signaling_discovery_actions; +mod p2p_channels_signaling_exchange_actions; diff --git a/node/src/p2p/channels/signaling/p2p_channels_signaling_discovery_actions.rs b/node/src/p2p/channels/signaling/p2p_channels_signaling_discovery_actions.rs new file mode 100644 index 0000000000..2540e8e534 --- /dev/null +++ b/node/src/p2p/channels/signaling/p2p_channels_signaling_discovery_actions.rs @@ -0,0 +1,7 @@ +use super::discovery::*; + +impl redux::EnablingCondition for P2pChannelsSignalingDiscoveryAction { + fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool { + state.p2p.is_enabled(self, time) + } +} diff --git a/node/src/p2p/channels/signaling/p2p_channels_signaling_exchange_actions.rs b/node/src/p2p/channels/signaling/p2p_channels_signaling_exchange_actions.rs new file mode 100644 index 0000000000..0861667ea3 --- /dev/null +++ b/node/src/p2p/channels/signaling/p2p_channels_signaling_exchange_actions.rs @@ -0,0 +1,7 @@ +use super::exchange::*; + +impl redux::EnablingCondition for P2pChannelsSignalingExchangeAction { + fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool { + state.p2p.is_enabled(self, time) + } +} diff --git a/node/src/p2p/mod.rs b/node/src/p2p/mod.rs index 0e5f50a4cf..06d72936dc 100644 --- a/node/src/p2p/mod.rs +++ b/node/src/p2p/mod.rs @@ -3,6 +3,10 @@ use p2p::{ channels::{ best_tip_effectful::P2pChannelsBestTipEffectfulAction, rpc_effectful::P2pChannelsRpcEffectfulAction, + signaling::{ + discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction, + exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction, + }, snark_effectful::P2pChannelsSnarkEffectfulAction, snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction, streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction, @@ -107,6 +111,8 @@ impl_into_global_action!(network::kad::P2pNetworkKademliaAction); impl_into_global_action!(network::pubsub::P2pNetworkPubsubAction); impl_into_global_action!(channels::P2pChannelsMessageReceivedAction); +impl_into_global_action!(channels::signaling::discovery::P2pChannelsSignalingDiscoveryAction); +impl_into_global_action!(channels::signaling::exchange::P2pChannelsSignalingExchangeAction); impl_into_global_action!(channels::best_tip::P2pChannelsBestTipAction); impl_into_global_action!(channels::transaction::P2pChannelsTransactionAction); impl_into_global_action!(channels::snark::P2pChannelsSnarkAction); @@ -132,6 +138,10 @@ impl_into_global_action!(effectful p2p::P2pNetworkPnetEffectfulAction); impl_into_global_action!(effectful connection::incoming_effectful::P2pConnectionIncomingEffectfulAction); impl_into_global_action!(effectful connection::outgoing_effectful::P2pConnectionOutgoingEffectfulAction); impl_into_global_action!(effectful p2p::disconnection_effectful::P2pDisconnectionEffectfulAction); +impl_into_global_action!( + effectful P2pChannelsSignalingDiscoveryEffectfulAction +); +impl_into_global_action!(effectful P2pChannelsSignalingExchangeEffectfulAction); impl_into_global_action!(effectful P2pChannelsBestTipEffectfulAction); impl_into_global_action!(effectful P2pChannelsStreamingRpcEffectfulAction); impl_into_global_action!(effectful P2pChannelsTransactionEffectfulAction); diff --git a/node/src/rpc/rpc_effects.rs b/node/src/rpc/rpc_effects.rs index ea15efaf89..04cb8d1a95 100644 --- a/node/src/rpc/rpc_effects.rs +++ b/node/src/rpc/rpc_effects.rs @@ -293,8 +293,11 @@ pub fn rpc_effects(store: &mut Store, action: RpcActionWithMeta) RpcAction::P2pConnectionIncomingRespond { rpc_id, response } => { let error = match &response { P2pConnectionResponse::Accepted(_) => None, - P2pConnectionResponse::InternalError => Some("RemoteInternalError".to_owned()), P2pConnectionResponse::Rejected(reason) => Some(format!("Rejected({:?})", reason)), + P2pConnectionResponse::SignalDecryptionFailed => { + Some("RemoteSignalDecryptionFailed".to_owned()) + } + P2pConnectionResponse::InternalError => Some("RemoteInternalError".to_owned()), }; let _ = store .service diff --git a/node/testing/src/node/ocaml/mod.rs b/node/testing/src/node/ocaml/mod.rs index 3266b36a33..a6a8e58804 100644 --- a/node/testing/src/node/ocaml/mod.rs +++ b/node/testing/src/node/ocaml/mod.rs @@ -106,7 +106,7 @@ impl OcamlNode { cmd.arg("daemon"); cmd.arg("--config-dir").arg(&config_dir); - cmd.arg("--libp2p-keypair").arg(&Self::privkey_path(dir)); + cmd.arg("--libp2p-keypair").arg(Self::privkey_path(dir)); cmd.args(["--external-ip", "127.0.0.1"]) .args(["--external-port", &config.libp2p_port.to_string()]) .args(["--client-port", &config.client_port.to_string()]) diff --git a/node/testing/src/scenarios/mod.rs b/node/testing/src/scenarios/mod.rs index 7fccead6ef..867a1f7464 100644 --- a/node/testing/src/scenarios/mod.rs +++ b/node/testing/src/scenarios/mod.rs @@ -2,6 +2,7 @@ //! Initial Joining: //! * Ensure new nodes can discover peers and establish initial connections. //! * Test how nodes handle scenarios when they are overwhelmed with too many connections or data requests. +//! //! TODO(vlad9486): //! Reconnection: Validate that nodes can reconnect after both intentional and unintentional disconnections. //! Handling Latency: Nodes should remain connected and synchronize even under high latency conditions. @@ -17,6 +18,7 @@ pub mod p2p; mod driver; pub use driver::*; +use p2p::signaling::P2pSignaling; pub use crate::cluster::runner::*; @@ -65,6 +67,7 @@ pub enum Scenarios { SimulationSmall(SimulationSmall), SimulationSmallForeverRealTime(SimulationSmallForeverRealTime), P2pReceiveBlock(P2pReceiveBlock), + P2pSignaling(P2pSignaling), MultiNodePubsubPropagateBlock(MultiNodePubsubPropagateBlock), RecordReplayBootstrap(RecordReplayBootstrap), RecordReplayBlockProduction(RecordReplayBlockProduction), @@ -89,6 +92,7 @@ impl Scenarios { Self::SimulationSmall(_) => true, Self::SimulationSmallForeverRealTime(_) => true, Self::MultiNodePubsubPropagateBlock(_) => true, // in progress + Self::P2pSignaling(_) => cfg!(feature = "p2p-webrtc"), _ => false, } } @@ -147,6 +151,7 @@ impl Scenarios { Self::SimulationSmall(_) => SimulationSmall::DOCS, Self::SimulationSmallForeverRealTime(_) => SimulationSmallForeverRealTime::DOCS, Self::P2pReceiveBlock(_) => P2pReceiveBlock::DOCS, + Self::P2pSignaling(_) => P2pSignaling::DOCS, Self::MultiNodePubsubPropagateBlock(_) => MultiNodePubsubPropagateBlock::DOCS, Self::RecordReplayBootstrap(_) => RecordReplayBootstrap::DOCS, Self::RecordReplayBlockProduction(_) => RecordReplayBlockProduction::DOCS, @@ -183,6 +188,7 @@ impl Scenarios { Self::SimulationSmall(v) => v.run(runner).await, Self::SimulationSmallForeverRealTime(v) => v.run(runner).await, Self::P2pReceiveBlock(v) => v.run(runner).await, + Self::P2pSignaling(v) => v.run(runner).await, Self::MultiNodePubsubPropagateBlock(v) => v.run(runner).await, Self::RecordReplayBootstrap(v) => v.run(runner).await, Self::RecordReplayBlockProduction(v) => v.run(runner).await, diff --git a/node/testing/src/scenarios/p2p/mod.rs b/node/testing/src/scenarios/p2p/mod.rs index c12f2ec263..a4000cd92a 100644 --- a/node/testing/src/scenarios/p2p/mod.rs +++ b/node/testing/src/scenarios/p2p/mod.rs @@ -3,3 +3,4 @@ pub mod basic_incoming_connections; pub mod basic_outgoing_connections; pub mod kademlia; pub mod pubsub; +pub mod signaling; diff --git a/node/testing/src/scenarios/p2p/signaling.rs b/node/testing/src/scenarios/p2p/signaling.rs new file mode 100644 index 0000000000..5124238d6a --- /dev/null +++ b/node/testing/src/scenarios/p2p/signaling.rs @@ -0,0 +1,60 @@ +use std::{collections::BTreeSet, time::Duration}; + +use node::{ + p2p::{P2pPeerAction, PeerId}, + Action, P2pAction, +}; + +use crate::{ + node::RustNodeTestingConfig, + scenarios::{ClusterRunner, DynEffectsData, RunCfg}, +}; + +/// Makes sure that when using WebRTC only nodes, peers can discover +/// each other and connect to each other via p2p signaling. +#[derive(documented::Documented, Default, Clone, Copy)] +pub struct P2pSignaling; + +impl P2pSignaling { + pub async fn run(self, mut runner: ClusterRunner<'_>) { + const NODES_N: usize = 4; + + let seed_config = RustNodeTestingConfig::devnet_default(); + let seed = runner.add_rust_node(seed_config.clone()); + + let node_config = seed_config.initial_peers(vec![seed.into()]); + let _node_1 = runner.add_rust_node(node_config.clone()); + let _node_2 = runner.add_rust_node(node_config.clone()); + let _node_3 = runner.add_rust_node(node_config.clone()); + + let node_peers: [_; NODES_N] = std::array::from_fn(|_| BTreeSet::::new()); + let node_peers = DynEffectsData::new(node_peers); + + runner + .run( + RunCfg::default() + .timeout(Duration::from_secs(60)) + .advance_time(1..=100) + .action_handler(move |node_id, _state, _, action| { + if action.action().kind().to_string().contains("Signaling") { + let me = _state.p2p.my_id(); + let me_pk = me.to_public_key().unwrap(); + dbg!((me, me_pk, action.action())); + } + match action.action() { + Action::P2p(P2pAction::Peer(P2pPeerAction::Ready { + peer_id, .. + })) => { + node_peers.inner()[node_id.index()].insert(*peer_id); + dbg!(node_peers.inner()) + .iter() + .all(|v| v.len() == NODES_N - 1) + } + _ => false, + } + }), + ) + .await + .expect("peers didn't discover each other"); + } +} diff --git a/node/testing/src/service/mod.rs b/node/testing/src/service/mod.rs index 65bd6ea09b..4880bed8d5 100644 --- a/node/testing/src/service/mod.rs +++ b/node/testing/src/service/mod.rs @@ -359,6 +359,22 @@ impl P2pServiceWebrtc for NodeTestingService { fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) { P2pServiceWebrtc::incoming_init(&mut self.real, peer_id, offer) } + + fn encrypt( + &mut self, + other_pk: &node::p2p::identity::PublicKey, + message: &T, + ) -> Result { + self.real.encrypt(other_pk, message) + } + + fn decrypt( + &mut self, + other_pub_key: &node::p2p::identity::PublicKey, + encrypted: &T::Encrypted, + ) -> Result { + self.real.decrypt(other_pub_key, encrypted) + } } impl P2pServiceWebrtcWithLibp2p for NodeTestingService { diff --git a/node/testing/tests/p2p_signaling.rs b/node/testing/tests/p2p_signaling.rs new file mode 100644 index 0000000000..ce17eac88c --- /dev/null +++ b/node/testing/tests/p2p_signaling.rs @@ -0,0 +1,7 @@ +#![cfg(feature = "p2p-webrtc")] + +use openmina_node_testing::scenarios::p2p::signaling::P2pSignaling; + +mod common; + +scenario_test!(p2p_signaling, P2pSignaling, P2pSignaling, true); diff --git a/node/web/src/lib.rs b/node/web/src/lib.rs index 2fc576ae16..c0f53874b1 100644 --- a/node/web/src/lib.rs +++ b/node/web/src/lib.rs @@ -24,7 +24,7 @@ fn main() { thread::main_thread_init(); wasm_bindgen_futures::spawn_local(async { console_error_panic_hook::set_once(); - tracing::initialize(tracing::Level::INFO); + tracing::initialize(tracing::Level::DEBUG); init_rayon().await.unwrap(); }); @@ -70,7 +70,6 @@ async fn setup_node( } node_builder - .p2p_no_discovery() .p2p_custom_task_spawner(P2pTaskRemoteSpawner {}) .unwrap(); node_builder.gather_stats(); diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 179d8aa67e..39012b850b 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -23,8 +23,9 @@ cfg-if = "1.0.0" url = "2.3.1" multihash = "0.18.1" sha2 = "0.10.6" -# ecies-ed25519 = "0.5.1" ed25519-dalek = { version = "2.1.1", features = ["serde"] } +x25519-dalek = { version = "2.0.1", features = ["static_secrets"] } +aes-gcm = "0.10.3" faster-stun = { version = "1.0.1", optional = true } reqwest = { version = "0.11.22", optional = true } unsigned-varint = { version = "0.8.0" } diff --git a/p2p/src/channels/mod.rs b/p2p/src/channels/mod.rs index 27d3d84c4b..78b35cf3b3 100644 --- a/p2p/src/channels/mod.rs +++ b/p2p/src/channels/mod.rs @@ -2,6 +2,7 @@ pub mod best_tip; pub mod best_tip_effectful; pub mod rpc; pub mod rpc_effectful; +pub mod signaling; pub mod snark; pub mod snark_effectful; pub mod snark_job_commitment; @@ -26,6 +27,8 @@ use binprot::{BinProtRead, BinProtWrite}; use binprot_derive::{BinProtRead, BinProtWrite}; use derive_more::From; use serde::{Deserialize, Serialize}; +use signaling::discovery::SignalingDiscoveryChannelMsg; +use signaling::exchange::SignalingExchangeChannelMsg; use strum_macros::EnumIter; use self::best_tip::BestTipPropagationChannelMsg; @@ -38,12 +41,14 @@ use self::transaction::TransactionPropagationChannelMsg; #[derive(Serialize, Deserialize, EnumIter, Debug, Ord, PartialOrd, Eq, PartialEq, Clone, Copy)] #[repr(u8)] pub enum ChannelId { - BestTipPropagation = 2, - TransactionPropagation = 3, - SnarkPropagation = 4, - SnarkJobCommitmentPropagation = 5, - Rpc = 100, - StreamingRpc = 101, + SignalingDiscovery = 1, + SignalingExchange = 2, + BestTipPropagation = 3, + TransactionPropagation = 4, + SnarkPropagation = 5, + SnarkJobCommitmentPropagation = 6, + Rpc = 7, + StreamingRpc = 8, } impl ChannelId { @@ -59,6 +64,8 @@ impl ChannelId { pub fn name(self) -> &'static str { match self { + Self::SignalingDiscovery => "signaling/discovery", + Self::SignalingExchange => "signaling/exchange", Self::BestTipPropagation => "best_tip/propagation", Self::TransactionPropagation => "transaction/propagation", Self::SnarkPropagation => "snark/propagation", @@ -70,6 +77,8 @@ impl ChannelId { pub fn supported_by_libp2p(self) -> bool { match self { + Self::SignalingDiscovery => false, + Self::SignalingExchange => false, Self::BestTipPropagation => true, Self::TransactionPropagation => true, Self::SnarkPropagation => true, @@ -81,6 +90,9 @@ impl ChannelId { pub fn max_msg_size(self) -> usize { match self { + // TODO(binier): measure signaling message sizes + Self::SignalingDiscovery => 16 * 1024, // 16KB + Self::SignalingExchange => 16 * 1024, // 16KB // TODO(binier): reduce this value once we change message for best tip // propagation to just propagating consensus state with block hash. Self::BestTipPropagation => 32 * 1024 * 1024, // 32MB @@ -122,6 +134,8 @@ impl MsgId { #[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, From, Debug, Clone)] pub enum ChannelMsg { + SignalingDiscovery(SignalingDiscoveryChannelMsg), + SignalingExchange(SignalingExchangeChannelMsg), BestTipPropagation(BestTipPropagationChannelMsg), TransactionPropagation(TransactionPropagationChannelMsg), SnarkPropagation(SnarkPropagationChannelMsg), @@ -133,6 +147,8 @@ pub enum ChannelMsg { impl ChannelMsg { pub fn channel_id(&self) -> ChannelId { match self { + Self::SignalingDiscovery(_) => ChannelId::SignalingDiscovery, + Self::SignalingExchange(_) => ChannelId::SignalingExchange, Self::BestTipPropagation(_) => ChannelId::BestTipPropagation, Self::TransactionPropagation(_) => ChannelId::TransactionPropagation, Self::SnarkPropagation(_) => ChannelId::SnarkPropagation, @@ -147,6 +163,8 @@ impl ChannelMsg { W: std::io::Write, { match self { + Self::SignalingDiscovery(v) => v.binprot_write(w), + Self::SignalingExchange(v) => v.binprot_write(w), Self::BestTipPropagation(v) => v.binprot_write(w), Self::TransactionPropagation(v) => v.binprot_write(w), Self::SnarkPropagation(v) => v.binprot_write(w), @@ -162,6 +180,12 @@ impl ChannelMsg { R: std::io::Read + ?Sized, { match id { + ChannelId::SignalingDiscovery => { + SignalingDiscoveryChannelMsg::binprot_read(r).map(|v| v.into()) + } + ChannelId::SignalingExchange => { + SignalingExchangeChannelMsg::binprot_read(r).map(|v| v.into()) + } ChannelId::BestTipPropagation => { BestTipPropagationChannelMsg::binprot_read(r).map(|v| v.into()) } @@ -194,6 +218,16 @@ impl crate::P2pState { // exhaustive matching so that we don't miss any channels. for id in self.config.enabled_channels.iter().copied() { match id { + ChannelId::SignalingDiscovery => { + dispatcher.push( + signaling::discovery::P2pChannelsSignalingDiscoveryAction::Init { peer_id }, + ); + } + ChannelId::SignalingExchange => { + dispatcher.push( + signaling::exchange::P2pChannelsSignalingExchangeAction::Init { peer_id }, + ); + } ChannelId::BestTipPropagation => { dispatcher.push(best_tip::P2pChannelsBestTipAction::Init { peer_id }); } diff --git a/p2p/src/channels/p2p_channels_actions.rs b/p2p/src/channels/p2p_channels_actions.rs index 30f598ae35..30e9ec7065 100644 --- a/p2p/src/channels/p2p_channels_actions.rs +++ b/p2p/src/channels/p2p_channels_actions.rs @@ -4,20 +4,32 @@ use serde::{Deserialize, Serialize}; use crate::{P2pState, PeerId}; use super::{ - best_tip::P2pChannelsBestTipAction, best_tip_effectful::P2pChannelsBestTipEffectfulAction, - rpc::P2pChannelsRpcAction, rpc_effectful::P2pChannelsRpcEffectfulAction, - snark::P2pChannelsSnarkAction, snark_effectful::P2pChannelsSnarkEffectfulAction, + best_tip::P2pChannelsBestTipAction, + best_tip_effectful::P2pChannelsBestTipEffectfulAction, + rpc::P2pChannelsRpcAction, + rpc_effectful::P2pChannelsRpcEffectfulAction, + signaling::{ + discovery::P2pChannelsSignalingDiscoveryAction, + discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction, + exchange::P2pChannelsSignalingExchangeAction, + exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction, + }, + snark::P2pChannelsSnarkAction, + snark_effectful::P2pChannelsSnarkEffectfulAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction, snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction, streaming_rpc::P2pChannelsStreamingRpcAction, streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction, transaction::P2pChannelsTransactionAction, - transaction_effectful::P2pChannelsTransactionEffectfulAction, ChannelMsg, + transaction_effectful::P2pChannelsTransactionEffectfulAction, + ChannelMsg, }; #[derive(Serialize, Deserialize, Debug, Clone, openmina_core::ActionEvent)] pub enum P2pChannelsAction { MessageReceived(P2pChannelsMessageReceivedAction), + SignalingDiscovery(P2pChannelsSignalingDiscoveryAction), + SignalingExchange(P2pChannelsSignalingExchangeAction), BestTip(P2pChannelsBestTipAction), Transaction(P2pChannelsTransactionAction), Snark(P2pChannelsSnarkAction), @@ -28,6 +40,8 @@ pub enum P2pChannelsAction { #[derive(Serialize, Deserialize, Debug, Clone, openmina_core::ActionEvent)] pub enum P2pChannelsEffectfulAction { + SignalingDiscovery(P2pChannelsSignalingDiscoveryEffectfulAction), + SignalingExchange(P2pChannelsSignalingExchangeEffectfulAction), BestTip(P2pChannelsBestTipEffectfulAction), Rpc(P2pChannelsRpcEffectfulAction), Snark(P2pChannelsSnarkEffectfulAction), @@ -40,6 +54,8 @@ impl P2pChannelsAction { pub fn peer_id(&self) -> Option<&PeerId> { match self { Self::MessageReceived(v) => Some(&v.peer_id), + Self::SignalingDiscovery(v) => Some(v.peer_id()), + Self::SignalingExchange(v) => Some(v.peer_id()), Self::BestTip(v) => Some(v.peer_id()), Self::Transaction(v) => v.peer_id(), Self::Snark(v) => v.peer_id(), @@ -54,6 +70,8 @@ impl redux::EnablingCondition for P2pChannelsAction { fn is_enabled(&self, state: &crate::P2pState, time: redux::Timestamp) -> bool { match self { P2pChannelsAction::MessageReceived(a) => a.is_enabled(state, time), + P2pChannelsAction::SignalingDiscovery(a) => a.is_enabled(state, time), + P2pChannelsAction::SignalingExchange(a) => a.is_enabled(state, time), P2pChannelsAction::Transaction(a) => a.is_enabled(state, time), P2pChannelsAction::BestTip(a) => a.is_enabled(state, time), P2pChannelsAction::Snark(a) => a.is_enabled(state, time), @@ -67,6 +85,8 @@ impl redux::EnablingCondition for P2pChannelsAction { impl redux::EnablingCondition for P2pChannelsEffectfulAction { fn is_enabled(&self, state: &crate::P2pState, time: redux::Timestamp) -> bool { match self { + P2pChannelsEffectfulAction::SignalingDiscovery(a) => a.is_enabled(state, time), + P2pChannelsEffectfulAction::SignalingExchange(a) => a.is_enabled(state, time), P2pChannelsEffectfulAction::BestTip(a) => a.is_enabled(state, time), P2pChannelsEffectfulAction::Transaction(a) => a.is_enabled(state, time), P2pChannelsEffectfulAction::StreamingRpc(a) => a.is_enabled(state, time), diff --git a/p2p/src/channels/p2p_channels_reducer.rs b/p2p/src/channels/p2p_channels_reducer.rs index 7bdc200507..ecf2ce99a7 100644 --- a/p2p/src/channels/p2p_channels_reducer.rs +++ b/p2p/src/channels/p2p_channels_reducer.rs @@ -1,6 +1,16 @@ use super::{ best_tip::{BestTipPropagationChannelMsg, P2pChannelsBestTipAction, P2pChannelsBestTipState}, rpc::{P2pChannelsRpcAction, P2pChannelsRpcState, RpcChannelMsg}, + signaling::{ + discovery::{ + P2pChannelsSignalingDiscoveryAction, P2pChannelsSignalingDiscoveryState, + SignalingDiscoveryChannelMsg, + }, + exchange::{ + P2pChannelsSignalingExchangeAction, P2pChannelsSignalingExchangeState, + SignalingExchangeChannelMsg, + }, + }, snark::{P2pChannelsSnarkAction, P2pChannelsSnarkState, SnarkPropagationChannelMsg}, snark_job_commitment::{ P2pChannelsSnarkJobCommitmentAction, P2pChannelsSnarkJobCommitmentState, @@ -37,6 +47,12 @@ impl P2pChannelsState { let (dispatcher, state) = state_context.into_dispatcher_and_state(); Self::dispatch_message(meta.with_action(action), dispatcher, state) } + P2pChannelsAction::SignalingDiscovery(action) => { + P2pChannelsSignalingDiscoveryState::reducer(state_context, meta.with_action(action)) + } + P2pChannelsAction::SignalingExchange(action) => { + P2pChannelsSignalingExchangeState::reducer(state_context, meta.with_action(action)) + } P2pChannelsAction::BestTip(action) => { P2pChannelsBestTipState::reducer(state_context, meta.with_action(action)) } @@ -76,6 +92,55 @@ impl P2pChannelsState { let mut is_enabled = |action: Action| dispatcher.push_if_enabled(action, state, time); let was_expected = match *action.message.clone() { + ChannelMsg::SignalingDiscovery(msg) => match msg { + SignalingDiscoveryChannelMsg::GetNext => is_enabled( + P2pChannelsSignalingDiscoveryAction::RequestReceived { peer_id }.into(), + ), + SignalingDiscoveryChannelMsg::Discover => is_enabled( + P2pChannelsSignalingDiscoveryAction::DiscoveryRequestReceived { peer_id } + .into(), + ), + SignalingDiscoveryChannelMsg::Discovered { target_public_key } => is_enabled( + P2pChannelsSignalingDiscoveryAction::DiscoveredReceived { + peer_id, + target_public_key, + } + .into(), + ), + SignalingDiscoveryChannelMsg::DiscoveredReject => is_enabled( + P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived { peer_id } + .into(), + ), + SignalingDiscoveryChannelMsg::DiscoveredAccept(offer) => is_enabled( + P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived { + peer_id, + offer, + } + .into(), + ), + SignalingDiscoveryChannelMsg::Answer(answer) => is_enabled( + P2pChannelsSignalingDiscoveryAction::AnswerReceived { peer_id, answer }.into(), + ), + }, + ChannelMsg::SignalingExchange(msg) => match msg { + SignalingExchangeChannelMsg::GetNext => is_enabled( + P2pChannelsSignalingExchangeAction::RequestReceived { peer_id }.into(), + ), + SignalingExchangeChannelMsg::OfferToYou { + offerer_pub_key, + offer, + } => is_enabled( + P2pChannelsSignalingExchangeAction::OfferReceived { + peer_id, + offerer_pub_key, + offer, + } + .into(), + ), + SignalingExchangeChannelMsg::Answer(answer) => is_enabled( + P2pChannelsSignalingExchangeAction::AnswerReceived { peer_id, answer }.into(), + ), + }, ChannelMsg::BestTipPropagation(msg) => match msg { BestTipPropagationChannelMsg::GetNext => { is_enabled(P2pChannelsBestTipAction::RequestReceived { peer_id }.into()) @@ -201,6 +266,7 @@ impl P2pChannelsState { }; if !was_expected { + // dbg!(&action.message); let reason = P2pDisconnectionReason::P2pChannelMsgUnexpected(chain_id); dispatcher.push(P2pDisconnectionAction::Init { peer_id, reason }); } diff --git a/p2p/src/channels/p2p_channels_service.rs b/p2p/src/channels/p2p_channels_service.rs index 1c34914d5a..38a6eae8d7 100644 --- a/p2p/src/channels/p2p_channels_service.rs +++ b/p2p/src/channels/p2p_channels_service.rs @@ -1,8 +1,21 @@ -use crate::PeerId; +use crate::{ + identity::{EncryptableType, PublicKey}, + PeerId, +}; use super::{ChannelId, ChannelMsg, MsgId}; pub trait P2pChannelsService: redux::Service { fn channel_open(&mut self, peer_id: PeerId, id: ChannelId); fn channel_send(&mut self, peer_id: PeerId, msg_id: MsgId, msg: ChannelMsg); + fn encrypt( + &mut self, + other_pk: &PublicKey, + message: &T, + ) -> Result; + fn decrypt( + &mut self, + other_pk: &PublicKey, + encrypted: &T::Encrypted, + ) -> Result; } diff --git a/p2p/src/channels/p2p_channels_state.rs b/p2p/src/channels/p2p_channels_state.rs index 4134672f67..30df02cd9d 100644 --- a/p2p/src/channels/p2p_channels_state.rs +++ b/p2p/src/channels/p2p_channels_state.rs @@ -5,6 +5,10 @@ use serde::{Deserialize, Serialize}; use super::{ best_tip::P2pChannelsBestTipState, rpc::{P2pChannelsRpcState, P2pRpcId}, + signaling::{ + discovery::P2pChannelsSignalingDiscoveryState, exchange::P2pChannelsSignalingExchangeState, + P2pChannelsSignalingState, + }, snark::P2pChannelsSnarkState, snark_job_commitment::P2pChannelsSnarkJobCommitmentState, streaming_rpc::P2pChannelsStreamingRpcState, @@ -14,6 +18,7 @@ use super::{ #[derive(Serialize, Deserialize, Debug, Clone)] pub struct P2pChannelsState { + pub signaling: P2pChannelsSignalingState, pub best_tip: P2pChannelsBestTipState, pub transaction: P2pChannelsTransactionState, pub snark: P2pChannelsSnarkState, @@ -27,6 +32,16 @@ pub struct P2pChannelsState { impl P2pChannelsState { pub fn new(enabled_channels: &BTreeSet) -> Self { Self { + signaling: P2pChannelsSignalingState { + discovery: match enabled_channels.contains(&ChannelId::SignalingDiscovery) { + false => P2pChannelsSignalingDiscoveryState::Disabled, + true => P2pChannelsSignalingDiscoveryState::Enabled, + }, + exchange: match enabled_channels.contains(&ChannelId::SignalingExchange) { + false => P2pChannelsSignalingExchangeState::Disabled, + true => P2pChannelsSignalingExchangeState::Enabled, + }, + }, best_tip: match enabled_channels.contains(&ChannelId::BestTipPropagation) { false => P2pChannelsBestTipState::Disabled, true => P2pChannelsBestTipState::Enabled, @@ -73,6 +88,8 @@ impl P2pChannelsState { impl P2pChannelsState { pub fn is_channel_ready(&self, chan_id: ChannelId) -> bool { match chan_id { + ChannelId::SignalingDiscovery => self.signaling.discovery.is_ready(), + ChannelId::SignalingExchange => self.signaling.exchange.is_ready(), ChannelId::BestTipPropagation => self.best_tip.is_ready(), ChannelId::TransactionPropagation => self.transaction.is_ready(), ChannelId::SnarkPropagation => self.snark.is_ready(), diff --git a/p2p/src/channels/signaling/discovery/mod.rs b/p2p/src/channels/signaling/discovery/mod.rs new file mode 100644 index 0000000000..112a35c2c3 --- /dev/null +++ b/p2p/src/channels/signaling/discovery/mod.rs @@ -0,0 +1,33 @@ +mod p2p_channels_signaling_discovery_state; +pub use p2p_channels_signaling_discovery_state::*; + +mod p2p_channels_signaling_discovery_actions; +pub use p2p_channels_signaling_discovery_actions::*; + +mod p2p_channels_signaling_discovery_reducer; + +use binprot_derive::{BinProtRead, BinProtWrite}; +use serde::{Deserialize, Serialize}; + +use crate::{ + identity::PublicKey, + webrtc::{EncryptedAnswer, EncryptedOffer}, +}; + +#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)] +pub enum SignalingDiscoveryChannelMsg { + /// Get next request for connecting 2 peers to each other. + GetNext, + /// Dialer is asking relayer to find available connected peer + /// and start signaling with it. + Discover, + /// Relayer found available connected peer and ready to facilitate + /// signaling. + Discovered { target_public_key: PublicKey }, + /// Dialer rejected target peer. + DiscoveredReject, + /// Dialer accepted target peer and wants to initiate signaling. + DiscoveredAccept(EncryptedOffer), + /// Relayed answer Answer to dialer to relay, if you aren't dialer. + Answer(Option), +} diff --git a/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_actions.rs b/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_actions.rs new file mode 100644 index 0000000000..66f0a3b0b1 --- /dev/null +++ b/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_actions.rs @@ -0,0 +1,277 @@ +use openmina_core::ActionEvent; +use serde::{Deserialize, Serialize}; + +use crate::{ + channels::P2pChannelsAction, + connection::P2pConnectionResponse, + identity::PublicKey, + webrtc::{EncryptedAnswer, EncryptedOffer, Offer}, + P2pState, PeerId, +}; + +use super::{P2pChannelsSignalingDiscoveryState, SignalingDiscoveryState}; + +#[derive(Debug, Clone, Serialize, Deserialize, ActionEvent)] +#[action_event(fields(display(peer_id)))] +pub enum P2pChannelsSignalingDiscoveryAction { + /// Initialize channel. + Init { + peer_id: PeerId, + }, + Pending { + peer_id: PeerId, + }, + /// Channel is ready. + Ready { + peer_id: PeerId, + }, + /// Send request to get next peer discovery request from peer. + RequestSend { + peer_id: PeerId, + }, + DiscoveryRequestReceived { + peer_id: PeerId, + }, + DiscoveredSend { + peer_id: PeerId, + target_public_key: PublicKey, + }, + DiscoveredRejectReceived { + peer_id: PeerId, + }, + DiscoveredAcceptReceived { + peer_id: PeerId, + offer: EncryptedOffer, + }, + AnswerSend { + peer_id: PeerId, + answer: Option, + }, + /// Received request to get next peer discovery request from us. + RequestReceived { + peer_id: PeerId, + }, + DiscoveryRequestSend { + peer_id: PeerId, + }, + DiscoveredReceived { + peer_id: PeerId, + target_public_key: PublicKey, + }, + DiscoveredReject { + peer_id: PeerId, + }, + DiscoveredAccept { + peer_id: PeerId, + offer: Box, + }, + AnswerReceived { + peer_id: PeerId, + answer: Option, + }, + AnswerDecrypted { + peer_id: PeerId, + answer: P2pConnectionResponse, + }, +} + +impl P2pChannelsSignalingDiscoveryAction { + pub fn peer_id(&self) -> &PeerId { + match self { + Self::Init { peer_id } + | Self::Pending { peer_id } + | Self::Ready { peer_id } + | Self::RequestSend { peer_id } + | Self::DiscoveryRequestReceived { peer_id } + | Self::DiscoveredSend { peer_id, .. } + | Self::DiscoveredRejectReceived { peer_id } + | Self::DiscoveredAcceptReceived { peer_id, .. } + | Self::AnswerSend { peer_id, .. } + | Self::RequestReceived { peer_id } + | Self::DiscoveryRequestSend { peer_id, .. } + | Self::DiscoveredReceived { peer_id, .. } + | Self::DiscoveredReject { peer_id, .. } + | Self::DiscoveredAccept { peer_id, .. } + | Self::AnswerReceived { peer_id, .. } + | Self::AnswerDecrypted { peer_id, .. } => peer_id, + } + } +} + +impl redux::EnablingCondition for P2pChannelsSignalingDiscoveryAction { + fn is_enabled(&self, state: &P2pState, now: redux::Timestamp) -> bool { + match self { + P2pChannelsSignalingDiscoveryAction::Init { peer_id } => { + state.get_ready_peer(peer_id).map_or(false, |p| { + matches!( + &p.channels.signaling.discovery, + P2pChannelsSignalingDiscoveryState::Enabled + ) + }) + } + P2pChannelsSignalingDiscoveryAction::Pending { peer_id } => { + state.get_ready_peer(peer_id).map_or(false, |p| { + matches!( + &p.channels.signaling.discovery, + P2pChannelsSignalingDiscoveryState::Init { .. } + ) + }) + } + P2pChannelsSignalingDiscoveryAction::Ready { peer_id } => { + state.get_ready_peer(peer_id).map_or(false, |p| { + matches!( + &p.channels.signaling.discovery, + P2pChannelsSignalingDiscoveryState::Pending { .. } + ) + }) + } + P2pChannelsSignalingDiscoveryAction::RequestSend { peer_id } => { + state.get_ready_peer(peer_id).map_or(false, |p| { + match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { local, .. } => { + match local { + SignalingDiscoveryState::WaitingForRequest { .. } => true, + SignalingDiscoveryState::DiscoveredRejected { time, .. } + | SignalingDiscoveryState::Answered { time, .. } => { + // Allow one discovery request per minute. + // TODO(binier): make configurable + now.checked_sub(*time) + .map_or(false, |dur| dur.as_secs() >= 60) + } + _ => false, + } + } + _ => false, + } + }) + } + P2pChannelsSignalingDiscoveryAction::DiscoveryRequestReceived { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { local, .. } => { + matches!(local, SignalingDiscoveryState::Requested { .. }) + } + _ => false, + }), + P2pChannelsSignalingDiscoveryAction::DiscoveredSend { + peer_id, + target_public_key, + .. + } => { + let target_peer_id = target_public_key.peer_id(); + let has_peer_requested_discovery = + state.get_ready_peer(peer_id).map_or(false, |p| { + match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { local, .. } => { + matches!(local, SignalingDiscoveryState::DiscoveryRequested { .. }) + } + _ => false, + } + }); + let target_peer_already_discovering_them = + state.get_ready_peer(&target_peer_id).map_or(false, |p| { + p.channels.signaling.sent_discovered_peer_id() == Some(*peer_id) + }); + has_peer_requested_discovery + && !target_peer_already_discovering_them + && state.ready_peers_iter().all(|(_, p)| { + p.channels.signaling.sent_discovered_peer_id() != Some(target_peer_id) + }) + } + P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { local, .. } => { + matches!(local, SignalingDiscoveryState::Discovered { .. }) + } + _ => false, + }), + P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { local, .. } => { + matches!(local, SignalingDiscoveryState::Discovered { .. }) + } + _ => false, + }), + P2pChannelsSignalingDiscoveryAction::AnswerSend { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { local, .. } => { + matches!(local, SignalingDiscoveryState::DiscoveredAccepted { .. }) + } + _ => false, + }), + P2pChannelsSignalingDiscoveryAction::RequestReceived { peer_id } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { remote, .. } => matches!( + remote, + SignalingDiscoveryState::WaitingForRequest { .. } + | SignalingDiscoveryState::DiscoveredRejected { .. } + | SignalingDiscoveryState::Answered { .. } + ), + _ => false, + }), + // TODO(binier): constrain interval between these requests + // to handle malicious peers. + P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend { peer_id, .. } => { + !state.already_has_min_peers() + && state.get_ready_peer(peer_id).map_or(false, |p| { + match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { remote, .. } => { + matches!(remote, SignalingDiscoveryState::Requested { .. }) + } + _ => false, + } + }) + } + P2pChannelsSignalingDiscoveryAction::DiscoveredReceived { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { remote, .. } => { + matches!(remote, SignalingDiscoveryState::DiscoveryRequested { .. }) + } + _ => false, + }), + P2pChannelsSignalingDiscoveryAction::DiscoveredReject { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { remote, .. } => { + matches!(remote, SignalingDiscoveryState::Discovered { .. }) + } + _ => false, + }), + P2pChannelsSignalingDiscoveryAction::DiscoveredAccept { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { remote, .. } => { + matches!(remote, SignalingDiscoveryState::Discovered { .. }) + } + _ => false, + }), + P2pChannelsSignalingDiscoveryAction::AnswerReceived { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { remote, .. } => { + matches!(remote, SignalingDiscoveryState::DiscoveredAccepted { .. }) + } + _ => false, + }), + P2pChannelsSignalingDiscoveryAction::AnswerDecrypted { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.discovery { + P2pChannelsSignalingDiscoveryState::Ready { remote, .. } => { + matches!(remote, SignalingDiscoveryState::DiscoveredAccepted { .. }) + } + _ => false, + }), + } + } +} + +impl From for crate::P2pAction { + fn from(action: P2pChannelsSignalingDiscoveryAction) -> Self { + Self::Channels(P2pChannelsAction::SignalingDiscovery(action)) + } +} diff --git a/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_reducer.rs b/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_reducer.rs new file mode 100644 index 0000000000..e043255b63 --- /dev/null +++ b/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_reducer.rs @@ -0,0 +1,431 @@ +use openmina_core::{bug_condition, Substate}; +use redux::ActionWithMeta; + +use crate::{ + channels::signaling::{ + discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction, + exchange::P2pChannelsSignalingExchangeAction, + }, + connection::{ + outgoing::{P2pConnectionOutgoingAction, P2pConnectionOutgoingInitOpts}, + P2pConnectionErrorResponse, P2pConnectionResponse, + }, + webrtc::SignalingMethod, + P2pState, +}; + +use super::{ + P2pChannelsSignalingDiscoveryAction, P2pChannelsSignalingDiscoveryState, + SignalingDiscoveryChannelMsg, SignalingDiscoveryState, +}; + +impl P2pChannelsSignalingDiscoveryState { + /// Substate is accessed + pub fn reducer( + mut state_context: Substate, + action: ActionWithMeta<&P2pChannelsSignalingDiscoveryAction>, + ) -> Result<(), String> + where + State: crate::P2pStateTrait, + Action: crate::P2pActionTrait, + { + let (action, meta) = action.split(); + let p2p_state = state_context.get_substate_mut()?; + let peer_id = *action.peer_id(); + let state = &mut p2p_state + .get_ready_peer_mut(&peer_id) + .ok_or_else(|| format!("Peer state not found for: {action:?}"))? + .channels + .signaling + .discovery; + + match action { + P2pChannelsSignalingDiscoveryAction::Init { .. } => { + *state = Self::Init { time: meta.time() }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::Init { peer_id }); + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::Pending { .. } => { + *state = Self::Pending { time: meta.time() }; + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::Ready { .. } => { + *state = Self::Ready { + time: meta.time(), + local: SignalingDiscoveryState::WaitingForRequest { time: meta.time() }, + remote: SignalingDiscoveryState::WaitingForRequest { time: meta.time() }, + }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pChannelsSignalingDiscoveryAction::RequestSend { peer_id }); + + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::RequestSend { .. } => { + let Self::Ready { local, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::RequestSend`, state: {state:?}", + ); + return Ok(()); + }; + *local = SignalingDiscoveryState::Requested { time: meta.time() }; + + let dispatcher = state_context.into_dispatcher(); + let message = SignalingDiscoveryChannelMsg::GetNext; + dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { + peer_id, + message, + }); + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::DiscoveryRequestReceived { .. } => { + let Self::Ready { local, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveryRequestReceived`, state: {state:?}", + ); + return Ok(()); + }; + + *local = SignalingDiscoveryState::DiscoveryRequested { time: meta.time() }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let state: &P2pState = state.substate()?; + state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time()); + + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::DiscoveredSend { + target_public_key, .. + } => { + let Self::Ready { local, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredSend`, state: {state:?}", + ); + return Ok(()); + }; + + *local = SignalingDiscoveryState::Discovered { + time: meta.time(), + target_public_key: target_public_key.clone(), + }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { + peer_id, + message: SignalingDiscoveryChannelMsg::Discovered { + target_public_key: target_public_key.clone(), + }, + }); + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived { .. } => { + let Self::Ready { local, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived`, state: {state:?}", + ); + return Ok(()); + }; + + let target_public_key = match local { + SignalingDiscoveryState::Discovered { + target_public_key, .. + } => target_public_key.clone(), + state => { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived`, state: {state:?}", + ); + return Ok(()); + } + }; + + *local = SignalingDiscoveryState::DiscoveredRejected { + time: meta.time(), + target_public_key, + }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let state: &P2pState = state.substate()?; + state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time()); + + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived { offer, .. } => { + let Self::Ready { local, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived`, state: {state:?}", + ); + return Ok(()); + }; + + let target_public_key = match local { + SignalingDiscoveryState::Discovered { + target_public_key, .. + } => target_public_key.clone(), + state => { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived`, state: {state:?}", + ); + return Ok(()); + } + }; + + *local = SignalingDiscoveryState::DiscoveredAccepted { + time: meta.time(), + target_public_key: target_public_key.clone(), + }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pChannelsSignalingExchangeAction::OfferSend { + peer_id: target_public_key.peer_id(), + offerer_pub_key: peer_id.to_public_key().unwrap(), + offer: offer.clone(), + }); + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::AnswerSend { answer, .. } => { + let Self::Ready { local, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerSend`, state: {state:?}", + ); + return Ok(()); + }; + + *local = SignalingDiscoveryState::Answered { time: meta.time() }; + + let dispatcher = state_context.into_dispatcher(); + let message = SignalingDiscoveryChannelMsg::Answer(answer.clone()); + dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { + peer_id, + message, + }); + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::RequestReceived { .. } => { + let Self::Ready { remote, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::RequestReceived`, state: {state:?}", + ); + return Ok(()); + }; + + *remote = SignalingDiscoveryState::Requested { time: meta.time() }; + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend { .. } => { + let Self::Ready { remote, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend`, state: {state:?}", + ); + return Ok(()); + }; + + *remote = SignalingDiscoveryState::DiscoveryRequested { time: meta.time() }; + let dispatcher = state_context.into_dispatcher(); + let message = SignalingDiscoveryChannelMsg::Discover; + dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { + peer_id, + message, + }); + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::DiscoveredReceived { + target_public_key, .. + } => { + let Self::Ready { remote, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredReceived`, state: {state:?}", + ); + return Ok(()); + }; + + *remote = SignalingDiscoveryState::Discovered { + time: meta.time(), + target_public_key: target_public_key.clone(), + }; + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let state: &P2pState = state.substate()?; + let action = P2pConnectionOutgoingAction::Init { + opts: P2pConnectionOutgoingInitOpts::WebRTC { + peer_id: target_public_key.peer_id(), + signaling: SignalingMethod::P2p { + relay_peer_id: peer_id, + }, + }, + rpc_id: None, + }; + let accepted = redux::EnablingCondition::is_enabled(&action, state, meta.time()); + if accepted { + dispatcher.push(action); + } else { + dispatcher + .push(P2pChannelsSignalingDiscoveryAction::DiscoveredReject { peer_id }); + } + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::DiscoveredReject { .. } => { + let Self::Ready { remote, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredReject`, state: {state:?}", + ); + return Ok(()); + }; + + let target_public_key = match remote { + SignalingDiscoveryState::Discovered { + target_public_key, .. + } => target_public_key.clone(), + state => { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredReject`, state: {state:?}", + ); + return Ok(()); + } + }; + + *remote = SignalingDiscoveryState::DiscoveredRejected { + time: meta.time(), + target_public_key, + }; + let dispatcher = state_context.into_dispatcher(); + let message = SignalingDiscoveryChannelMsg::DiscoveredReject; + dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { + peer_id, + message, + }); + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::DiscoveredAccept { offer, .. } => { + let Self::Ready { remote, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAccept`, state: {state:?}", + ); + return Ok(()); + }; + + let target_public_key = match remote { + SignalingDiscoveryState::Discovered { + target_public_key, .. + } => target_public_key.clone(), + state => { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::DiscoveredAccept`, state: {state:?}", + ); + return Ok(()); + } + }; + + *remote = SignalingDiscoveryState::DiscoveredAccepted { + time: meta.time(), + target_public_key: target_public_key.clone(), + }; + let dispatcher = state_context.into_dispatcher(); + // TODO(binier): this action might not be enabled, in + // which case we sshould be rejecting discovered peer. + dispatcher.push(P2pConnectionOutgoingAction::OfferSendSuccess { + peer_id: target_public_key.peer_id(), + }); + dispatcher.push( + P2pChannelsSignalingDiscoveryEffectfulAction::OfferEncryptAndSend { + peer_id, + pub_key: target_public_key, + offer: offer.clone(), + }, + ); + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::AnswerReceived { answer, .. } => { + let Self::Ready { remote, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerReceived`, state: {state:?}", + ); + return Ok(()); + }; + + let target_public_key = match remote { + SignalingDiscoveryState::DiscoveredAccepted { + target_public_key, .. + } => target_public_key.clone(), + state => { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerReceived`, state: {state:?}", + ); + return Ok(()); + } + }; + + let dispatcher = state_context.into_dispatcher(); + match answer { + // TODO(binier): custom error + None => dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvError { + peer_id: target_public_key.peer_id(), + error: P2pConnectionErrorResponse::InternalError, + }), + Some(answer) => dispatcher.push( + P2pChannelsSignalingDiscoveryEffectfulAction::AnswerDecrypt { + peer_id, + pub_key: target_public_key, + answer: answer.clone(), + }, + ), + } + Ok(()) + } + P2pChannelsSignalingDiscoveryAction::AnswerDecrypted { answer, .. } => { + let Self::Ready { remote, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerDecrypted`, state: {state:?}", + ); + return Ok(()); + }; + + let target_public_key = match remote { + SignalingDiscoveryState::DiscoveredAccepted { + target_public_key, .. + } => target_public_key.clone(), + state => { + bug_condition!( + "Invalid state for `P2pChannelsSignalingDiscoveryAction::AnswerDecrypted`, state: {state:?}", + ); + return Ok(()); + } + }; + + *remote = SignalingDiscoveryState::Answered { time: meta.time() }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + match answer { + P2pConnectionResponse::Accepted(answer) => { + dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvSuccess { + peer_id: target_public_key.peer_id(), + answer: answer.clone(), + }) + } + P2pConnectionResponse::Rejected(reason) => { + dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvError { + peer_id: target_public_key.peer_id(), + error: P2pConnectionErrorResponse::Rejected(*reason), + }) + } + P2pConnectionResponse::SignalDecryptionFailed => { + dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvError { + peer_id: target_public_key.peer_id(), + error: P2pConnectionErrorResponse::SignalDecryptionFailed, + }) + } + P2pConnectionResponse::InternalError => { + dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvError { + peer_id: target_public_key.peer_id(), + error: P2pConnectionErrorResponse::InternalError, + }) + } + } + + let state: &P2pState = state.substate()?; + state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time()); + Ok(()) + } + } + } +} diff --git a/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_state.rs b/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_state.rs new file mode 100644 index 0000000000..06ba5de293 --- /dev/null +++ b/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_state.rs @@ -0,0 +1,57 @@ +use serde::{Deserialize, Serialize}; + +use crate::identity::PublicKey; + +#[allow(clippy::large_enum_variant)] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum P2pChannelsSignalingDiscoveryState { + Disabled, + Enabled, + Init { + time: redux::Timestamp, + }, + Pending { + time: redux::Timestamp, + }, + Ready { + time: redux::Timestamp, + /// We are the requestors here. + local: SignalingDiscoveryState, + /// We are the responders here. + remote: SignalingDiscoveryState, + }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum SignalingDiscoveryState { + WaitingForRequest { + time: redux::Timestamp, + }, + Requested { + time: redux::Timestamp, + }, + DiscoveryRequested { + time: redux::Timestamp, + }, + Discovered { + time: redux::Timestamp, + target_public_key: PublicKey, + }, + DiscoveredRejected { + time: redux::Timestamp, + target_public_key: PublicKey, + }, + DiscoveredAccepted { + time: redux::Timestamp, + target_public_key: PublicKey, + }, + Answered { + time: redux::Timestamp, + }, +} + +impl P2pChannelsSignalingDiscoveryState { + pub fn is_ready(&self) -> bool { + matches!(self, Self::Ready { .. }) + } +} diff --git a/p2p/src/channels/signaling/discovery_effectful/mod.rs b/p2p/src/channels/signaling/discovery_effectful/mod.rs new file mode 100644 index 0000000000..ae681677f3 --- /dev/null +++ b/p2p/src/channels/signaling/discovery_effectful/mod.rs @@ -0,0 +1,4 @@ +mod p2p_channels_signaling_discovery_effectful_actions; +pub use p2p_channels_signaling_discovery_effectful_actions::P2pChannelsSignalingDiscoveryEffectfulAction; + +mod p2p_channels_signaling_discovery_effectful_effects; diff --git a/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_actions.rs b/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_actions.rs new file mode 100644 index 0000000000..20e133e489 --- /dev/null +++ b/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_actions.rs @@ -0,0 +1,44 @@ +use openmina_core::ActionEvent; +use serde::{Deserialize, Serialize}; + +use crate::{ + channels::{signaling::discovery::SignalingDiscoveryChannelMsg, P2pChannelsEffectfulAction}, + connection::Offer, + identity::PublicKey, + webrtc::EncryptedAnswer, + P2pState, PeerId, +}; + +#[derive(Debug, Clone, Serialize, Deserialize, ActionEvent)] +#[action_event(fields(display(peer_id)))] +pub enum P2pChannelsSignalingDiscoveryEffectfulAction { + Init { + peer_id: PeerId, + }, + MessageSend { + peer_id: PeerId, + message: SignalingDiscoveryChannelMsg, + }, + OfferEncryptAndSend { + peer_id: PeerId, + pub_key: PublicKey, + offer: Box, + }, + AnswerDecrypt { + peer_id: PeerId, + pub_key: PublicKey, + answer: EncryptedAnswer, + }, +} + +impl redux::EnablingCondition for P2pChannelsSignalingDiscoveryEffectfulAction { + fn is_enabled(&self, _state: &P2pState, _time: redux::Timestamp) -> bool { + true + } +} + +impl From for crate::P2pEffectfulAction { + fn from(action: P2pChannelsSignalingDiscoveryEffectfulAction) -> crate::P2pEffectfulAction { + crate::P2pEffectfulAction::Channels(P2pChannelsEffectfulAction::SignalingDiscovery(action)) + } +} diff --git a/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_effects.rs b/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_effects.rs new file mode 100644 index 0000000000..b95e028cfd --- /dev/null +++ b/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_effects.rs @@ -0,0 +1,75 @@ +use redux::ActionMeta; + +use crate::{ + channels::{ + signaling::discovery::{P2pChannelsSignalingDiscoveryAction, SignalingDiscoveryChannelMsg}, + ChannelId, MsgId, + }, + connection::P2pConnectionResponse, + P2pChannelsService, +}; + +use super::P2pChannelsSignalingDiscoveryEffectfulAction; + +impl P2pChannelsSignalingDiscoveryEffectfulAction { + pub fn effects(self, _meta: &ActionMeta, store: &mut Store) + where + Store: crate::P2pStore, + Store::Service: P2pChannelsService, + { + match self { + P2pChannelsSignalingDiscoveryEffectfulAction::Init { peer_id } => { + store + .service() + .channel_open(peer_id, ChannelId::SignalingDiscovery); + store.dispatch(P2pChannelsSignalingDiscoveryAction::Pending { peer_id }); + } + P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { peer_id, message } => { + message_send(store.service(), peer_id, message); + } + P2pChannelsSignalingDiscoveryEffectfulAction::OfferEncryptAndSend { + peer_id, + pub_key, + offer, + } => match store.service().encrypt(&pub_key, &*offer) { + Err(()) => { + // todo!("Failed to encrypt webrtc offer. Handle it.") + } + Ok(offer) => { + let message = SignalingDiscoveryChannelMsg::DiscoveredAccept(offer); + message_send(store.service(), peer_id, message); + } + }, + P2pChannelsSignalingDiscoveryEffectfulAction::AnswerDecrypt { + peer_id, + pub_key, + answer, + } => { + match store + .service() + .decrypt::(&pub_key, &answer) + { + Err(()) => { + store.dispatch(P2pChannelsSignalingDiscoveryAction::AnswerDecrypted { + peer_id, + answer: P2pConnectionResponse::SignalDecryptionFailed, + }); + } + Ok(answer) => { + store.dispatch(P2pChannelsSignalingDiscoveryAction::AnswerDecrypted { + peer_id, + answer, + }); + } + } + } + } + } +} + +fn message_send(service: &mut S, peer_id: crate::PeerId, message: SignalingDiscoveryChannelMsg) +where + S: P2pChannelsService, +{ + service.channel_send(peer_id, MsgId::first(), message.into()) +} diff --git a/p2p/src/channels/signaling/exchange/mod.rs b/p2p/src/channels/signaling/exchange/mod.rs new file mode 100644 index 0000000000..223eaa27d5 --- /dev/null +++ b/p2p/src/channels/signaling/exchange/mod.rs @@ -0,0 +1,28 @@ +mod p2p_channels_signaling_exchange_state; +pub use p2p_channels_signaling_exchange_state::*; + +mod p2p_channels_signaling_exchange_actions; +pub use p2p_channels_signaling_exchange_actions::*; + +mod p2p_channels_signaling_exchange_reducer; + +use binprot_derive::{BinProtRead, BinProtWrite}; +use serde::{Deserialize, Serialize}; + +use crate::{ + identity::PublicKey, + webrtc::{EncryptedAnswer, EncryptedOffer}, +}; + +#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)] +pub enum SignalingExchangeChannelMsg { + /// Get next incoming offer to us. + GetNext, + /// Relayed offer from dialer to you. + OfferToYou { + offerer_pub_key: PublicKey, + offer: EncryptedOffer, + }, + /// Answer to dialer to relay, if you aren't dialer. + Answer(Option), +} diff --git a/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_actions.rs b/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_actions.rs new file mode 100644 index 0000000000..93d64d4f6d --- /dev/null +++ b/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_actions.rs @@ -0,0 +1,187 @@ +use openmina_core::ActionEvent; +use serde::{Deserialize, Serialize}; + +use crate::{ + channels::P2pChannelsAction, + connection::P2pConnectionResponse, + identity::PublicKey, + webrtc::{EncryptedAnswer, EncryptedOffer, Offer}, + P2pState, PeerId, +}; + +use super::{P2pChannelsSignalingExchangeState, SignalingExchangeState}; + +#[derive(Debug, Clone, Serialize, Deserialize, ActionEvent)] +#[action_event(fields(display(peer_id)))] +pub enum P2pChannelsSignalingExchangeAction { + /// Initialize channel. + Init { + peer_id: PeerId, + }, + Pending { + peer_id: PeerId, + }, + /// Channel is ready. + Ready { + peer_id: PeerId, + }, + /// Send request to get next offer/incoming connection from peer. + RequestSend { + peer_id: PeerId, + }, + OfferReceived { + peer_id: PeerId, + offerer_pub_key: PublicKey, + offer: EncryptedOffer, + }, + OfferDecryptError { + peer_id: PeerId, + }, + OfferDecryptSuccess { + peer_id: PeerId, + offer: Offer, + }, + AnswerSend { + peer_id: PeerId, + answer: P2pConnectionResponse, + }, + /// Received request to get next offer/incoming connection from peer. + RequestReceived { + peer_id: PeerId, + }, + OfferSend { + peer_id: PeerId, + offerer_pub_key: PublicKey, + offer: EncryptedOffer, + }, + AnswerReceived { + peer_id: PeerId, + answer: Option, + }, +} + +impl P2pChannelsSignalingExchangeAction { + pub fn peer_id(&self) -> &PeerId { + match self { + Self::Init { peer_id } + | Self::Pending { peer_id } + | Self::Ready { peer_id } + | Self::RequestSend { peer_id } + | Self::OfferReceived { peer_id, .. } + | Self::OfferDecryptError { peer_id, .. } + | Self::OfferDecryptSuccess { peer_id, .. } + | Self::AnswerSend { peer_id, .. } + | Self::RequestReceived { peer_id } + | Self::OfferSend { peer_id, .. } + | Self::AnswerReceived { peer_id, .. } => peer_id, + } + } +} + +impl redux::EnablingCondition for P2pChannelsSignalingExchangeAction { + fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool { + match self { + P2pChannelsSignalingExchangeAction::Init { peer_id } => { + state.get_ready_peer(peer_id).map_or(false, |p| { + matches!( + &p.channels.signaling.exchange, + P2pChannelsSignalingExchangeState::Enabled + ) + }) + } + P2pChannelsSignalingExchangeAction::Pending { peer_id } => { + state.get_ready_peer(peer_id).map_or(false, |p| { + matches!( + &p.channels.signaling.exchange, + P2pChannelsSignalingExchangeState::Init { .. } + ) + }) + } + P2pChannelsSignalingExchangeAction::Ready { peer_id } => { + state.get_ready_peer(peer_id).map_or(false, |p| { + matches!( + &p.channels.signaling.exchange, + P2pChannelsSignalingExchangeState::Pending { .. } + ) + }) + } + P2pChannelsSignalingExchangeAction::RequestSend { peer_id } => { + !state.already_has_max_peers() + && state.get_ready_peer(peer_id).map_or(false, |p| { + match &p.channels.signaling.exchange { + P2pChannelsSignalingExchangeState::Ready { local, .. } => matches!( + local, + SignalingExchangeState::WaitingForRequest { .. } + | SignalingExchangeState::Answered { .. }, + ), + _ => false, + } + }) + } + P2pChannelsSignalingExchangeAction::OfferReceived { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.exchange { + P2pChannelsSignalingExchangeState::Ready { local, .. } => { + matches!(local, SignalingExchangeState::Requested { .. }) + } + _ => false, + }), + P2pChannelsSignalingExchangeAction::OfferDecryptError { peer_id } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.exchange { + P2pChannelsSignalingExchangeState::Ready { local, .. } => { + matches!(local, SignalingExchangeState::Offered { .. }) + } + _ => false, + }), + P2pChannelsSignalingExchangeAction::OfferDecryptSuccess { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.exchange { + P2pChannelsSignalingExchangeState::Ready { local, .. } => { + matches!(local, SignalingExchangeState::Offered { .. }) + } + _ => false, + }), + P2pChannelsSignalingExchangeAction::AnswerSend { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.exchange { + P2pChannelsSignalingExchangeState::Ready { local, .. } => { + matches!(local, SignalingExchangeState::Offered { .. }) + } + _ => false, + }), + P2pChannelsSignalingExchangeAction::RequestReceived { peer_id } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.exchange { + P2pChannelsSignalingExchangeState::Ready { remote, .. } => matches!( + remote, + SignalingExchangeState::WaitingForRequest { .. } + | SignalingExchangeState::Answered { .. } + ), + _ => false, + }), + P2pChannelsSignalingExchangeAction::OfferSend { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.exchange { + P2pChannelsSignalingExchangeState::Ready { remote, .. } => { + matches!(remote, SignalingExchangeState::Requested { .. }) + } + _ => false, + }), + P2pChannelsSignalingExchangeAction::AnswerReceived { peer_id, .. } => state + .get_ready_peer(peer_id) + .map_or(false, |p| match &p.channels.signaling.exchange { + P2pChannelsSignalingExchangeState::Ready { remote, .. } => { + matches!(remote, SignalingExchangeState::Offered { .. }) + } + _ => false, + }), + } + } +} + +impl From for crate::P2pAction { + fn from(action: P2pChannelsSignalingExchangeAction) -> Self { + Self::Channels(P2pChannelsAction::SignalingExchange(action)) + } +} diff --git a/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_reducer.rs b/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_reducer.rs new file mode 100644 index 0000000000..3c4d8dc7c4 --- /dev/null +++ b/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_reducer.rs @@ -0,0 +1,247 @@ +use openmina_core::{bug_condition, Substate}; +use redux::ActionWithMeta; + +use crate::{ + channels::signaling::{ + discovery::P2pChannelsSignalingDiscoveryAction, + exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction, + }, + connection::{ + incoming::{ + IncomingSignalingMethod, P2pConnectionIncomingAction, P2pConnectionIncomingInitOpts, + }, + P2pConnectionResponse, + }, + P2pState, +}; + +use super::{ + P2pChannelsSignalingExchangeAction, P2pChannelsSignalingExchangeState, + SignalingExchangeChannelMsg, SignalingExchangeState, +}; + +impl P2pChannelsSignalingExchangeState { + /// Substate is accessed + pub fn reducer( + mut state_context: Substate, + action: ActionWithMeta<&P2pChannelsSignalingExchangeAction>, + ) -> Result<(), String> + where + State: crate::P2pStateTrait, + Action: crate::P2pActionTrait, + { + let (action, meta) = action.split(); + let p2p_state = state_context.get_substate_mut()?; + let peer_id = *action.peer_id(); + let state = &mut p2p_state + .get_ready_peer_mut(&peer_id) + .ok_or_else(|| format!("Peer state not found for: {action:?}"))? + .channels + .signaling + .exchange; + + match action { + P2pChannelsSignalingExchangeAction::Init { .. } => { + *state = Self::Init { time: meta.time() }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pChannelsSignalingExchangeEffectfulAction::Init { peer_id }); + Ok(()) + } + P2pChannelsSignalingExchangeAction::Pending { .. } => { + *state = Self::Pending { time: meta.time() }; + Ok(()) + } + P2pChannelsSignalingExchangeAction::Ready { .. } => { + *state = Self::Ready { + time: meta.time(), + local: SignalingExchangeState::WaitingForRequest { time: meta.time() }, + remote: SignalingExchangeState::WaitingForRequest { time: meta.time() }, + }; + + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pChannelsSignalingExchangeAction::RequestSend { peer_id }); + + Ok(()) + } + P2pChannelsSignalingExchangeAction::RequestSend { .. } => { + let Self::Ready { local, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingExchangeAction::RequestSend`, state: {state:?}", + ); + return Ok(()); + }; + *local = SignalingExchangeState::Requested { time: meta.time() }; + + let dispatcher = state_context.into_dispatcher(); + let message = SignalingExchangeChannelMsg::GetNext; + dispatcher.push(P2pChannelsSignalingExchangeEffectfulAction::MessageSend { + peer_id, + message, + }); + Ok(()) + } + P2pChannelsSignalingExchangeAction::OfferReceived { + offer, + offerer_pub_key, + .. + } => { + let Self::Ready { local, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingExchangeAction::OfferReceived`, state: {state:?}", + ); + return Ok(()); + }; + + *local = SignalingExchangeState::Offered { + time: meta.time(), + offerer_pub_key: offerer_pub_key.clone(), + }; + + let dispatcher = state_context.into_dispatcher(); + let offer = offer.clone(); + dispatcher.push(P2pChannelsSignalingExchangeEffectfulAction::OfferDecrypt { + peer_id, + pub_key: offerer_pub_key.clone(), + offer, + }); + Ok(()) + } + P2pChannelsSignalingExchangeAction::OfferDecryptError { .. } => { + let dispatcher = state_context.into_dispatcher(); + let answer = P2pConnectionResponse::SignalDecryptionFailed; + dispatcher.push(P2pChannelsSignalingExchangeAction::AnswerSend { peer_id, answer }); + Ok(()) + } + P2pChannelsSignalingExchangeAction::OfferDecryptSuccess { offer, .. } => { + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let state: &P2pState = state.substate()?; + let opts = P2pConnectionIncomingInitOpts { + peer_id: offer.identity_pub_key.peer_id(), + signaling: IncomingSignalingMethod::P2p { + relay_peer_id: peer_id, + }, + offer: offer.clone().into(), + }; + match state.incoming_accept(opts.peer_id, &opts.offer) { + Ok(_) => { + dispatcher.push(P2pConnectionIncomingAction::Init { opts, rpc_id: None }); + } + Err(reason) => { + let answer = P2pConnectionResponse::Rejected(reason); + dispatcher.push(P2pChannelsSignalingExchangeAction::AnswerSend { + peer_id, + answer, + }); + } + } + Ok(()) + } + P2pChannelsSignalingExchangeAction::AnswerSend { answer, .. } => { + let Self::Ready { local, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingExchangeAction::AnswerSend`, state: {state:?}", + ); + return Ok(()); + }; + + let offerer_pub_key = match local { + SignalingExchangeState::Offered { + offerer_pub_key, .. + } => offerer_pub_key.clone(), + state => { + bug_condition!( + "Invalid state for `P2pChannelsSignalingExchangeAction::AnswerSend`, local state: {state:?}", + ); + return Ok(()); + } + }; + + *local = SignalingExchangeState::Answered { time: meta.time() }; + + let answer = answer.clone(); + let dispatcher = state_context.into_dispatcher(); + dispatcher.push( + P2pChannelsSignalingExchangeEffectfulAction::AnswerEncryptAndSend { + peer_id, + pub_key: offerer_pub_key.clone(), + answer: Some(answer), + }, + ); + dispatcher.push(P2pConnectionIncomingAction::AnswerSendSuccess { + peer_id: offerer_pub_key.peer_id(), + }); + Ok(()) + } + P2pChannelsSignalingExchangeAction::RequestReceived { .. } => { + let Self::Ready { remote, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingExchangeAction::RequestReceived`, state: {state:?}", + ); + return Ok(()); + }; + + *remote = SignalingExchangeState::Requested { time: meta.time() }; + + let (dispatcher, state) = state_context.into_dispatcher_and_state(); + let state: &P2pState = state.substate()?; + state.webrtc_discovery_respond_with_availble_peers(dispatcher, meta.time()); + Ok(()) + } + P2pChannelsSignalingExchangeAction::OfferSend { + offer, + offerer_pub_key, + .. + } => { + let Self::Ready { remote, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingExchangeAction::OfferSend`, state: {state:?}", + ); + return Ok(()); + }; + + *remote = SignalingExchangeState::Offered { + time: meta.time(), + offerer_pub_key: offerer_pub_key.clone(), + }; + let dispatcher = state_context.into_dispatcher(); + let message = SignalingExchangeChannelMsg::OfferToYou { + offerer_pub_key: offerer_pub_key.clone(), + offer: offer.clone(), + }; + dispatcher.push(P2pChannelsSignalingExchangeEffectfulAction::MessageSend { + peer_id, + message, + }); + Ok(()) + } + P2pChannelsSignalingExchangeAction::AnswerReceived { answer, .. } => { + let Self::Ready { remote, .. } = state else { + bug_condition!( + "Invalid state for `P2pChannelsSignalingExchangeAction::AnswerReceived`, state: {state:?}", + ); + return Ok(()); + }; + + let offerer_pub_key = match remote { + SignalingExchangeState::Offered { + offerer_pub_key, .. + } => offerer_pub_key.clone(), + state => { + bug_condition!( + "Invalid state for `P2pChannelsSignalingExchangeAction::AnswerReceived`, state: {state:?}", + ); + return Ok(()); + } + }; + *remote = SignalingExchangeState::Answered { time: meta.time() }; + let dispatcher = state_context.into_dispatcher(); + dispatcher.push(P2pChannelsSignalingDiscoveryAction::AnswerSend { + peer_id: offerer_pub_key.peer_id(), + answer: answer.clone(), + }); + Ok(()) + } + } + } +} diff --git a/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_state.rs b/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_state.rs new file mode 100644 index 0000000000..346efa5a30 --- /dev/null +++ b/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_state.rs @@ -0,0 +1,49 @@ +use serde::{Deserialize, Serialize}; + +use crate::identity::PublicKey; + +#[allow(clippy::large_enum_variant)] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum P2pChannelsSignalingExchangeState { + Disabled, + Enabled, + Init { + time: redux::Timestamp, + }, + Pending { + time: redux::Timestamp, + }, + Ready { + time: redux::Timestamp, + /// We are the requestors here. + local: SignalingExchangeState, + /// We are the responders here. + remote: SignalingExchangeState, + }, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum SignalingExchangeState { + /// Next offer wasn't requested, so we shouldn't receive/send any offers. + WaitingForRequest { + time: redux::Timestamp, + }, + /// Next offer/incoming connection was requested. Offer will be received + /// once/if peer wants to connect. + Requested { + time: redux::Timestamp, + }, + Offered { + time: redux::Timestamp, + offerer_pub_key: PublicKey, + }, + Answered { + time: redux::Timestamp, + }, +} + +impl P2pChannelsSignalingExchangeState { + pub fn is_ready(&self) -> bool { + matches!(self, Self::Ready { .. }) + } +} diff --git a/p2p/src/channels/signaling/exchange_effectful/mod.rs b/p2p/src/channels/signaling/exchange_effectful/mod.rs new file mode 100644 index 0000000000..89a36e2c5b --- /dev/null +++ b/p2p/src/channels/signaling/exchange_effectful/mod.rs @@ -0,0 +1,4 @@ +mod p2p_channels_signaling_exchange_effectful_actions; +pub use p2p_channels_signaling_exchange_effectful_actions::P2pChannelsSignalingExchangeEffectfulAction; + +mod p2p_channels_signaling_exchange_effectful_effects; diff --git a/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_actions.rs b/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_actions.rs new file mode 100644 index 0000000000..6daa07af78 --- /dev/null +++ b/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_actions.rs @@ -0,0 +1,44 @@ +use openmina_core::ActionEvent; +use serde::{Deserialize, Serialize}; + +use crate::{ + channels::{signaling::exchange::SignalingExchangeChannelMsg, P2pChannelsEffectfulAction}, + connection::P2pConnectionResponse, + identity::PublicKey, + webrtc::EncryptedOffer, + P2pState, PeerId, +}; + +#[derive(Debug, Clone, Serialize, Deserialize, ActionEvent)] +#[action_event(fields(display(peer_id)))] +pub enum P2pChannelsSignalingExchangeEffectfulAction { + Init { + peer_id: PeerId, + }, + MessageSend { + peer_id: PeerId, + message: SignalingExchangeChannelMsg, + }, + OfferDecrypt { + peer_id: PeerId, + pub_key: PublicKey, + offer: EncryptedOffer, + }, + AnswerEncryptAndSend { + peer_id: PeerId, + pub_key: PublicKey, + answer: Option, + }, +} + +impl redux::EnablingCondition for P2pChannelsSignalingExchangeEffectfulAction { + fn is_enabled(&self, _state: &P2pState, _time: redux::Timestamp) -> bool { + true + } +} + +impl From for crate::P2pEffectfulAction { + fn from(action: P2pChannelsSignalingExchangeEffectfulAction) -> crate::P2pEffectfulAction { + crate::P2pEffectfulAction::Channels(P2pChannelsEffectfulAction::SignalingExchange(action)) + } +} diff --git a/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_effects.rs b/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_effects.rs new file mode 100644 index 0000000000..d7fa962bc4 --- /dev/null +++ b/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_effects.rs @@ -0,0 +1,96 @@ +use openmina_core::bug_condition; +use redux::ActionMeta; + +use crate::{ + channels::{ + signaling::exchange::{P2pChannelsSignalingExchangeAction, SignalingExchangeChannelMsg}, + ChannelId, MsgId, + }, + webrtc::{EncryptedAnswer, Offer}, + P2pChannelsService, +}; + +use super::P2pChannelsSignalingExchangeEffectfulAction; + +impl P2pChannelsSignalingExchangeEffectfulAction { + pub fn effects(self, _meta: &ActionMeta, store: &mut Store) + where + Store: crate::P2pStore, + Store::Service: P2pChannelsService, + { + match self { + P2pChannelsSignalingExchangeEffectfulAction::Init { peer_id } => { + store + .service() + .channel_open(peer_id, ChannelId::SignalingExchange); + store.dispatch(P2pChannelsSignalingExchangeAction::Pending { peer_id }); + } + P2pChannelsSignalingExchangeEffectfulAction::MessageSend { peer_id, message } => { + message_send(store.service(), peer_id, message); + } + P2pChannelsSignalingExchangeEffectfulAction::OfferDecrypt { + peer_id, + pub_key, + offer, + } => { + match store.service().decrypt::(&pub_key, &offer) { + Err(()) => { + store.dispatch(P2pChannelsSignalingExchangeAction::OfferDecryptError { + peer_id, + }); + } + Ok(offer) if offer.identity_pub_key != pub_key => { + // TODO(binier): propagate specific error. + // This is invalid behavior either from relayer or offerer. + store.dispatch(P2pChannelsSignalingExchangeAction::OfferDecryptError { + peer_id, + }); + } + Ok(offer) => { + store.dispatch(P2pChannelsSignalingExchangeAction::OfferDecryptSuccess { + peer_id, + offer, + }); + } + } + } + P2pChannelsSignalingExchangeEffectfulAction::AnswerEncryptAndSend { + peer_id, + pub_key, + answer, + } => { + let answer = match answer { + None => { + answer_message_send(store.service(), peer_id, None); + return; + } + Some(v) => v, + }; + match store.service().encrypt(&pub_key, &answer) { + Err(()) => bug_condition!("Failed to encrypt webrtc answer. Shouldn't happen since we managed to decrypt sent offer."), + Ok(answer) => { + answer_message_send(store.service(), peer_id, Some(answer)); + } + } + } + } + } +} + +fn answer_message_send(service: &mut S, peer_id: crate::PeerId, answer: Option) +where + S: P2pChannelsService, +{ + message_send( + service, + peer_id, + SignalingExchangeChannelMsg::Answer(answer), + ) +} + +fn message_send(service: &mut S, peer_id: crate::PeerId, message: SignalingExchangeChannelMsg) +where + S: P2pChannelsService, +{ + service.channel_send(peer_id, MsgId::first(), message.into()) +} diff --git a/p2p/src/channels/signaling/mod.rs b/p2p/src/channels/signaling/mod.rs new file mode 100644 index 0000000000..7ee73a0da5 --- /dev/null +++ b/p2p/src/channels/signaling/mod.rs @@ -0,0 +1,79 @@ +//! There are 2 state machines in this module: +//! 1. `discovery` - used for discovering a new target peer and initiating +//! signaling process. +//! 2. `exchange` - used by intermediary peer to relay an offer to the +//! target peer and receive an answer from it. +//! +//! These are the overall steps that happens in these state machines in +//! order to connect two (dialer and listener) peers to each other using +//! intermediary peer (relayer): +//! 1. [discovery] Dialer asks relayer to discover an available peer. +//! 2. [discovery] Relayer responds with available peer's (listener's) public key. +//! 3. [discovery] Dialer accepts/rejects the target peer (listener). +//! 4. [discovery] If dialer accepts the peer, it sends webrtc offer to relayer. +//! 5. [exchange] Relayer relays received webrtc offer to the listener peer. +//! 6. [exchange] Relayer receives webrtc answer from the listener peer. +//! 7. [discovery] Relayer relays the answer to the dialer. + +pub mod discovery; +pub mod discovery_effectful; +pub mod exchange; +pub mod exchange_effectful; + +mod p2p_channels_signaling_state; +pub use p2p_channels_signaling_state::*; + +use std::collections::BTreeSet; + +use discovery::P2pChannelsSignalingDiscoveryAction; + +impl crate::P2pState { + pub(super) fn webrtc_discovery_respond_with_availble_peers( + &self, + dispatcher: &mut redux::Dispatcher, + time: redux::Timestamp, + ) where + State: crate::P2pStateTrait, + Action: crate::P2pActionTrait, + { + let (mut available_peers, requests) = self.ready_peers_iter().fold( + (BTreeSet::new(), BTreeSet::new()), + |(mut available, mut requests), (peer_id, peer)| { + if peer.channels.signaling.is_looking_for_incoming_peer() { + available.insert(peer_id); + } + if peer.channels.signaling.is_looking_for_peer() { + requests.insert(peer_id); + } else if let Some(peer_id) = peer.channels.signaling.sent_discovered_peer_id() { + available.remove(&peer_id); + } + (available, requests) + }, + ); + + /// random shuffle available peers + use rand::{seq::SliceRandom, SeedableRng}; + let mut rng = rand::rngs::StdRng::seed_from_u64(time.into()); + let mut available_peers_ordered = available_peers.iter().copied().collect::>(); + available_peers_ordered.shuffle(&mut rng); + + for &requester in requests { + if available_peers.is_empty() { + break; + } + for &&target_peer_id in &available_peers_ordered { + if target_peer_id == requester || !available_peers.contains(&target_peer_id) { + continue; + } + let action = P2pChannelsSignalingDiscoveryAction::DiscoveredSend { + peer_id: requester, + target_public_key: target_peer_id.to_public_key().unwrap(), + }; + if redux::EnablingCondition::is_enabled(&action, self, time) { + dispatcher.push(action); + available_peers.remove(&target_peer_id); + } + } + } + } +} diff --git a/p2p/src/channels/signaling/p2p_channels_signaling_state.rs b/p2p/src/channels/signaling/p2p_channels_signaling_state.rs new file mode 100644 index 0000000000..a3c2c64f82 --- /dev/null +++ b/p2p/src/channels/signaling/p2p_channels_signaling_state.rs @@ -0,0 +1,76 @@ +use serde::{Deserialize, Serialize}; + +use crate::PeerId; + +use super::{ + discovery::{P2pChannelsSignalingDiscoveryState, SignalingDiscoveryState}, + exchange::{P2pChannelsSignalingExchangeState, SignalingExchangeState}, +}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct P2pChannelsSignalingState { + pub discovery: P2pChannelsSignalingDiscoveryState, + pub exchange: P2pChannelsSignalingExchangeState, +} + +impl P2pChannelsSignalingState { + pub fn am_looking_for_peer(&self) -> bool { + match &self.discovery { + P2pChannelsSignalingDiscoveryState::Ready { remote, .. } => { + matches!(remote, SignalingDiscoveryState::DiscoveryRequested { .. }) + } + _ => false, + } + } + + pub fn received_discovered_peer_id(&self) -> Option { + match &self.discovery { + P2pChannelsSignalingDiscoveryState::Ready { + remote: + SignalingDiscoveryState::Discovered { + target_public_key, .. + } + | SignalingDiscoveryState::DiscoveredAccepted { + target_public_key, .. + }, + .. + } => Some(target_public_key.peer_id()), + _ => None, + } + } + + pub fn is_looking_for_peer(&self) -> bool { + match &self.discovery { + P2pChannelsSignalingDiscoveryState::Ready { local, .. } => { + matches!(local, SignalingDiscoveryState::DiscoveryRequested { .. }) + } + _ => false, + } + } + + pub fn sent_discovered_peer_id(&self) -> Option { + match &self.discovery { + P2pChannelsSignalingDiscoveryState::Ready { + local: + SignalingDiscoveryState::Discovered { + target_public_key, .. + } + | SignalingDiscoveryState::DiscoveredAccepted { + target_public_key, .. + }, + .. + } => Some(target_public_key.peer_id()), + _ => None, + } + } + + pub fn is_looking_for_incoming_peer(&self) -> bool { + matches!( + self.exchange, + P2pChannelsSignalingExchangeState::Ready { + remote: SignalingExchangeState::Requested { .. }, + .. + } + ) + } +} diff --git a/p2p/src/connection/incoming/mod.rs b/p2p/src/connection/incoming/mod.rs index d47fde1365..6290765252 100644 --- a/p2p/src/connection/incoming/mod.rs +++ b/p2p/src/connection/incoming/mod.rs @@ -21,7 +21,10 @@ pub struct P2pConnectionIncomingInitOpts { // TODO(binier): maybe move to `crate::webrtc`? #[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone)] pub enum IncomingSignalingMethod { + /// Http rpc is used for sending offer and getting answer as a response. Http, + /// Intermediary/Relay peer is used for exchanging offer and answer messages. + P2p { relay_peer_id: PeerId }, } impl P2pState { diff --git a/p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs b/p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs index 5eab82f470..a3716bb4e6 100644 --- a/p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs +++ b/p2p/src/connection/incoming/p2p_connection_incoming_reducer.rs @@ -5,6 +5,7 @@ use openmina_core::{bug_condition, debug, warn, Substate}; use redux::{ActionWithMeta, Dispatcher, Timestamp}; use crate::{ + channels::signaling::exchange::P2pChannelsSignalingExchangeAction, connection::{ incoming::P2pConnectionIncomingError, incoming_effectful::P2pConnectionIncomingEffectfulAction, @@ -44,7 +45,7 @@ impl P2pConnectionIncomingState { .entry(peer_id) .or_insert_with(|| P2pPeerState { is_libp2p: false, - dial_opts: opts.offer.listen_port.map(|listen_port| { + dial_opts: opts.offer.listen_port.and_then(|listen_port| { let signaling = match opts.signaling { IncomingSignalingMethod::Http => { SignalingMethod::Http(HttpSignalingInfo { @@ -52,8 +53,9 @@ impl P2pConnectionIncomingState { port: listen_port, }) } + IncomingSignalingMethod::P2p { .. } => return None, }; - P2pConnectionOutgoingInitOpts::WebRTC { peer_id, signaling } + Some(P2pConnectionOutgoingInitOpts::WebRTC { peer_id, signaling }) }), status: P2pPeerStatus::Connecting(P2pConnectionState::incoming_init(opts)), identify: None, @@ -144,33 +146,46 @@ impl P2pConnectionIncomingState { let state = p2p_state .incoming_peer_connection_mut(peer_id) .ok_or_else(|| format!("Invalid state for: {:?}", action))?; - if let Self::AnswerSdpCreateSuccess { + + let Self::AnswerSdpCreateSuccess { signaling, offer, rpc_id, .. } = state - { - *state = Self::AnswerReady { - time: meta.time(), - signaling: signaling.clone(), - offer: offer.clone(), - answer: answer.clone(), - rpc_id: rpc_id.take(), - }; - } else { + else { bug_condition!( "Invalid state for `P2pConnectionIncomingAction::AnswerReady`: {:?}", state ); - } + return Ok(()); + }; + let signaling = signaling.clone(); + *state = Self::AnswerReady { + time: meta.time(), + signaling: signaling.clone(), + offer: offer.clone(), + answer: answer.clone(), + rpc_id: rpc_id.take(), + }; + let (dispatcher, state) = state_context.into_dispatcher_and_state(); let p2p_state: &P2pState = state.substate()?; - dispatcher.push(P2pConnectionIncomingEffectfulAction::AnswerSend { - peer_id: *peer_id, - answer: answer.clone(), - }); + match signaling { + IncomingSignalingMethod::Http => { + dispatcher.push(P2pConnectionIncomingEffectfulAction::AnswerSend { + peer_id: *peer_id, + answer: answer.clone(), + }); + } + IncomingSignalingMethod::P2p { relay_peer_id } => { + dispatcher.push(P2pChannelsSignalingExchangeAction::AnswerSend { + peer_id: relay_peer_id, + answer: P2pConnectionResponse::Accepted(answer.clone()), + }); + } + } if let Some(rpc_id) = p2p_state.peer_connection_rpc_id(peer_id) { if let Some(callback) = diff --git a/p2p/src/connection/mod.rs b/p2p/src/connection/mod.rs index eabd6e5e88..3ea18ab9e4 100644 --- a/p2p/src/connection/mod.rs +++ b/p2p/src/connection/mod.rs @@ -17,55 +17,14 @@ pub use p2p_connection_service::*; use serde::{Deserialize, Serialize}; -use crate::webrtc; - -#[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone, Copy, thiserror::Error)] -pub enum RejectionReason { - #[error("peer_id does not match peer's public key")] - PeerIdAndPublicKeyMismatch, - #[error("target peer_id is not local node's peer_id")] - TargetPeerIdNotMe, - #[error("too many peers")] - PeerCapacityFull, - #[error("peer already connected")] - AlreadyConnected, - #[error("self connection detected")] - ConnectingToSelf, -} - -impl RejectionReason { - pub fn is_bad(&self) -> bool { - match self { - Self::PeerIdAndPublicKeyMismatch => true, - Self::TargetPeerIdNotMe => true, - Self::PeerCapacityFull => false, - Self::AlreadyConnected => true, - Self::ConnectingToSelf => false, - } - } -} +pub use crate::webrtc::{Answer, Offer, P2pConnectionResponse, RejectionReason}; #[derive(Serialize, Deserialize, Debug, Clone, thiserror::Error)] pub enum P2pConnectionErrorResponse { #[error("connection rejected: {0}")] Rejected(RejectionReason), + #[error("signal decryption failed")] + SignalDecryptionFailed, #[error("internal error")] InternalError, } - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum P2pConnectionResponse { - Accepted(Box), - Rejected(RejectionReason), - InternalError, -} - -impl P2pConnectionResponse { - pub fn internal_error_str() -> &'static str { - "InternalError" - } - - pub fn internal_error_json_str() -> &'static str { - "\"InternalError\"" - } -} diff --git a/p2p/src/connection/outgoing/mod.rs b/p2p/src/connection/outgoing/mod.rs index 66706643bd..6bed5c356e 100644 --- a/p2p/src/connection/outgoing/mod.rs +++ b/p2p/src/connection/outgoing/mod.rs @@ -149,6 +149,20 @@ impl P2pConnectionOutgoingInitOpts { } } + pub fn can_connect_directly(&self) -> bool { + match self { + Self::LibP2P(..) => true, + Self::WebRTC { signaling, .. } => signaling.can_connect_directly(), + } + } + + pub fn webrtc_p2p_relay_peer_id(&self) -> Option { + match self { + Self::WebRTC { signaling, .. } => signaling.p2p_relay_peer_id(), + _ => None, + } + } + /// The OCaml implementation of Mina uses the `get_some_initial_peers` RPC to exchange peer information. /// Try to convert this RPC response into our peer address representation. /// Recognize a hack for marking the webrtc signaling server. @@ -226,6 +240,7 @@ impl P2pConnectionOutgoingInitOpts { (*peer_id).to_string().into_bytes().into(), ), }), + SignalingMethod::P2p { .. } => None, }, } } diff --git a/p2p/src/connection/outgoing/p2p_connection_outgoing_actions.rs b/p2p/src/connection/outgoing/p2p_connection_outgoing_actions.rs index 68b643f31a..17214837bf 100644 --- a/p2p/src/connection/outgoing/p2p_connection_outgoing_actions.rs +++ b/p2p/src/connection/outgoing/p2p_connection_outgoing_actions.rs @@ -189,6 +189,7 @@ impl redux::EnablingCondition for P2pConnectionOutgoingAction { matches!(s, P2pConnectionOutgoingState::OfferSdpCreatePending { .. }) } P2pConnectionOutgoingError::Rejected(_) + | P2pConnectionOutgoingError::RemoteSignalDecryptionFailed | P2pConnectionOutgoingError::RemoteInternalError => { matches!(s, P2pConnectionOutgoingState::AnswerRecvPending { .. }) } diff --git a/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs b/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs index 6da92bf33b..5768a9249c 100644 --- a/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs +++ b/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use openmina_core::{bug_condition, warn, Substate}; use crate::{ + channels::signaling::discovery::P2pChannelsSignalingDiscoveryAction, connection::{ outgoing_effectful::P2pConnectionOutgoingEffectfulAction, P2pConnectionErrorResponse, P2pConnectionState, @@ -44,7 +45,7 @@ impl P2pConnectionOutgoingState { .entry(*opts.peer_id()) .or_insert_with(|| P2pPeerState { is_libp2p: opts.is_libp2p(), - dial_opts: Some(opts.clone()), + dial_opts: Some(opts.clone()).filter(|v| v.can_connect_directly()), status: P2pPeerStatus::Connecting(P2pConnectionState::outgoing_init( opts, )), @@ -190,26 +191,34 @@ impl P2pConnectionOutgoingState { let state = p2p_state .outgoing_peer_connection_mut(peer_id) .ok_or_else(|| format!("Invalid state: {:?}", action))?; - if let Self::OfferSdpCreateSuccess { opts, rpc_id, .. } = state { - *state = Self::OfferReady { - time: meta.time(), - opts: opts.clone(), - offer: offer.clone(), - rpc_id: rpc_id.take(), - }; - } else { + let Self::OfferSdpCreateSuccess { opts, rpc_id, .. } = state else { bug_condition!( "Invalid state for `P2pConnectionOutgoingAction::OfferReady`: {:?}", state ); return Ok(()); - } + }; + let opts = opts.clone(); + *state = Self::OfferReady { + time: meta.time(), + opts: opts.clone(), + offer: offer.clone(), + rpc_id: rpc_id.take(), + }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pConnectionOutgoingEffectfulAction::OfferSend { - peer_id: *peer_id, - offer: offer.clone(), - }); + + if let Some(relay_peer_id) = opts.webrtc_p2p_relay_peer_id() { + dispatcher.push(P2pChannelsSignalingDiscoveryAction::DiscoveredAccept { + peer_id: relay_peer_id, + offer: offer.clone(), + }); + } else { + dispatcher.push(P2pConnectionOutgoingEffectfulAction::OfferSend { + peer_id: *peer_id, + offer: offer.clone(), + }); + } Ok(()) } P2pConnectionOutgoingAction::OfferSendSuccess { peer_id } => { @@ -276,6 +285,9 @@ impl P2pConnectionOutgoingState { P2pConnectionErrorResponse::Rejected(reason) => { P2pConnectionOutgoingError::Rejected(*reason) } + P2pConnectionErrorResponse::SignalDecryptionFailed => { + P2pConnectionOutgoingError::RemoteSignalDecryptionFailed + } P2pConnectionErrorResponse::InternalError => { P2pConnectionOutgoingError::RemoteInternalError } diff --git a/p2p/src/connection/outgoing/p2p_connection_outgoing_state.rs b/p2p/src/connection/outgoing/p2p_connection_outgoing_state.rs index 8f31a42d6f..0c4ec47c14 100644 --- a/p2p/src/connection/outgoing/p2p_connection_outgoing_state.rs +++ b/p2p/src/connection/outgoing/p2p_connection_outgoing_state.rs @@ -125,6 +125,8 @@ pub enum P2pConnectionOutgoingError { SdpCreateError(String), #[error("rejected: {0}")] Rejected(RejectionReason), + #[error("remote signal decryption failed")] + RemoteSignalDecryptionFailed, #[error("remote internal error")] RemoteInternalError, #[error("finalization error: {0}")] diff --git a/p2p/src/connection/outgoing_effectful/p2p_connection_outgoing_effectful_effects.rs b/p2p/src/connection/outgoing_effectful/p2p_connection_outgoing_effectful_effects.rs index 408128e106..104c576659 100644 --- a/p2p/src/connection/outgoing_effectful/p2p_connection_outgoing_effectful_effects.rs +++ b/p2p/src/connection/outgoing_effectful/p2p_connection_outgoing_effectful_effects.rs @@ -60,6 +60,10 @@ impl P2pConnectionOutgoingEffectfulAction { }; service.http_signaling_request(url, *offer); } + webrtc::SignalingMethod::P2p { .. } => { + bug_condition!("`P2pConnectionOutgoingEffectfulAction::OfferSend` shouldn't be called for `webrtc::SignalingMethod::P2p`"); + return; + } } store.dispatch(P2pConnectionOutgoingAction::OfferSendSuccess { peer_id }); } diff --git a/p2p/src/disconnection/p2p_disconnection_actions.rs b/p2p/src/disconnection/p2p_disconnection_actions.rs index 58171ad5f0..aec90cef41 100644 --- a/p2p/src/disconnection/p2p_disconnection_actions.rs +++ b/p2p/src/disconnection/p2p_disconnection_actions.rs @@ -10,7 +10,7 @@ pub type P2pDisconnectionActionWithMetaRef<'a> = redux::ActionWithMeta<&'a P2pDi #[action_event(fields(display(peer_id), display(reason)), level = info)] pub enum P2pDisconnectionAction { /// Initialize disconnection. - #[action_event(level = debug)] + #[action_event(fields(display(peer_id), display(reason)), level = info)] Init { peer_id: PeerId, reason: P2pDisconnectionReason, diff --git a/p2p/src/disconnection_effectful/p2p_disconnection_effectful_actions.rs b/p2p/src/disconnection_effectful/p2p_disconnection_effectful_actions.rs index 03efd82fd5..205aca2bea 100644 --- a/p2p/src/disconnection_effectful/p2p_disconnection_effectful_actions.rs +++ b/p2p/src/disconnection_effectful/p2p_disconnection_effectful_actions.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::{P2pPeerStatus, P2pState, PeerId}; #[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)] -#[action_event(fields(display(peer_id), display(reason)), level = info)] +#[action_event(fields(display(peer_id), display(reason)), level = debug)] pub enum P2pDisconnectionEffectfulAction { /// Initialize disconnection. Init { peer_id: PeerId }, diff --git a/p2p/src/identity/mod.rs b/p2p/src/identity/mod.rs index 29addb3331..50947d1a9e 100644 --- a/p2p/src/identity/mod.rs +++ b/p2p/src/identity/mod.rs @@ -5,4 +5,4 @@ mod public_key; pub use public_key::PublicKey; mod secret_key; -pub use secret_key::SecretKey; +pub use secret_key::{EncryptableType, SecretKey}; diff --git a/p2p/src/identity/public_key.rs b/p2p/src/identity/public_key.rs index 14763b8661..44cd4ad680 100644 --- a/p2p/src/identity/public_key.rs +++ b/p2p/src/identity/public_key.rs @@ -1,5 +1,10 @@ -use std::{fmt, str::FromStr}; +use std::{ + fmt, + io::{Read, Write}, + str::FromStr, +}; +use binprot::{BinProtRead, BinProtWrite}; use ed25519_dalek::VerifyingKey as Ed25519PublicKey; use serde::{Deserialize, Serialize}; @@ -22,6 +27,10 @@ impl PublicKey { pub fn peer_id(&self) -> PeerId { PeerId::from_bytes(self.to_bytes()) } + + pub fn to_x25519(&self) -> x25519_dalek::PublicKey { + self.0.to_montgomery().to_bytes().into() + } } impl fmt::Display for PublicKey { @@ -98,3 +107,20 @@ impl<'de> serde::Deserialize<'de> for PublicKey { } } } + +impl BinProtWrite for PublicKey { + fn binprot_write(&self, w: &mut W) -> std::io::Result<()> { + w.write_all(&self.to_bytes()) + } +} + +impl BinProtRead for PublicKey { + fn binprot_read(r: &mut R) -> Result + where + Self: Sized, + { + let mut buf = [0; 32]; + r.read_exact(&mut buf)?; + Self::from_bytes(buf).map_err(|err| binprot::Error::CustomError(Box::new(err))) + } +} diff --git a/p2p/src/identity/secret_key.rs b/p2p/src/identity/secret_key.rs index 20d53f2ac4..5237391cc6 100644 --- a/p2p/src/identity/secret_key.rs +++ b/p2p/src/identity/secret_key.rs @@ -1,7 +1,7 @@ use std::{fmt, str::FromStr}; use ed25519_dalek::SigningKey as Ed25519SecretKey; -use rand::Rng; +use rand::{CryptoRng, Rng}; use serde::{Deserialize, Serialize}; use crate::identity::PublicKey; @@ -29,7 +29,7 @@ impl SecretKey { pub fn deterministic(i: usize) -> Self { let mut bytes = [0; 32]; let bytes_len = bytes.len(); - let i_bytes = i.to_be_bytes(); + let i_bytes = (i + 1).to_be_bytes(); let i = bytes_len - i_bytes.len(); bytes[i..bytes_len].copy_from_slice(&i_bytes); Self::from_bytes(bytes) @@ -46,6 +46,69 @@ impl SecretKey { pub fn public_key(&self) -> PublicKey { PublicKey(self.0.verifying_key()) } + + pub fn to_x25519(&self) -> x25519_dalek::StaticSecret { + self.0.to_scalar_bytes().into() + } +} + +use aes_gcm::{ + aead::{Aead, AeadCore}, + Aes256Gcm, KeyInit, +}; +impl SecretKey { + fn shared_key(&self, other_pk: &PublicKey) -> Result { + let key = self.to_x25519().diffie_hellman(&other_pk.to_x25519()); + if !key.was_contributory() { + return Err(()); + } + let key = key.to_bytes(); + // eprintln!("[shared_key] {} & {} = {}", self.public_key(), other_pk, hex::encode(&key)); + let key: &aes_gcm::Key = (&key).into(); + Ok(Aes256Gcm::new(key)) + } + + pub fn encrypt_raw( + &self, + other_pk: &PublicKey, + rng: impl Rng + CryptoRng, + data: &[u8], + ) -> Result, ()> { + let shared_key = self.shared_key(other_pk)?; + let nonce = Aes256Gcm::generate_nonce(rng); + let mut buffer = Vec::from(AsRef::<[u8]>::as_ref(&nonce)); + buffer.extend(shared_key.encrypt(&nonce, data).or(Err(()))?); + Ok(buffer) + } + + pub fn encrypt( + &self, + other_pk: &PublicKey, + rng: impl Rng + CryptoRng, + data: &M, + ) -> Result { + let data = serde_json::to_vec(data).or(Err(()))?; + self.encrypt_raw(other_pk, rng, &data).map(Into::into) + } + + pub fn decrypt_raw(&self, other_pk: &PublicKey, ciphertext: &[u8]) -> Result, ()> { + let shared_key = self.shared_key(other_pk)?; + let (nonce, ciphertext) = ciphertext.split_at_checked(12).ok_or(())?; + shared_key.decrypt(nonce.into(), ciphertext).or(Err(())) + } + + pub fn decrypt( + &self, + other_pk: &PublicKey, + ciphertext: &M::Encrypted, + ) -> Result { + let data = self.decrypt_raw(other_pk, ciphertext.as_ref())?; + serde_json::from_slice(&data).or(Err(())) + } +} + +pub trait EncryptableType: Serialize + for<'a> Deserialize<'a> { + type Encrypted: From> + AsRef<[u8]>; } impl fmt::Display for SecretKey { @@ -92,6 +155,16 @@ impl TryFrom for libp2p_identity::Keypair { } } +#[cfg(feature = "p2p-libp2p")] +impl TryFrom for SecretKey { + type Error = (); + + fn try_from(value: libp2p_identity::Keypair) -> Result { + let bytes = value.try_into_ed25519().or(Err(()))?.to_bytes(); + Ok(Self::from_bytes(bytes[0..32].try_into().or(Err(()))?)) + } +} + impl Serialize for SecretKey { fn serialize(&self, serializer: S) -> Result where diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 19ec7c580e..92000312c4 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -6,9 +6,18 @@ pub mod disconnection_effectful; pub mod identity; use bootstrap::P2pNetworkKadBootstrapState; use channels::{ - best_tip::P2pChannelsBestTipAction, best_tip_effectful::P2pChannelsBestTipEffectfulAction, - rpc::P2pChannelsRpcAction, rpc_effectful::P2pChannelsRpcEffectfulAction, - snark::P2pChannelsSnarkAction, snark_effectful::P2pChannelsSnarkEffectfulAction, + best_tip::P2pChannelsBestTipAction, + best_tip_effectful::P2pChannelsBestTipEffectfulAction, + rpc::P2pChannelsRpcAction, + rpc_effectful::P2pChannelsRpcEffectfulAction, + signaling::{ + discovery::P2pChannelsSignalingDiscoveryAction, + discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction, + exchange::P2pChannelsSignalingExchangeAction, + exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction, + }, + snark::P2pChannelsSnarkAction, + snark_effectful::P2pChannelsSnarkEffectfulAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction, snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction, streaming_rpc::P2pChannelsStreamingRpcAction, @@ -126,6 +135,10 @@ pub trait P2pActionTrait: + From + From + From + + From + + From + + From + + From + From + From + From diff --git a/p2p/src/p2p_effects.rs b/p2p/src/p2p_effects.rs index cd4d591c6f..5a360e5722 100644 --- a/p2p/src/p2p_effects.rs +++ b/p2p/src/p2p_effects.rs @@ -13,6 +13,12 @@ impl P2pEffectfulAction { match self { P2pEffectfulAction::Initialize => {} P2pEffectfulAction::Channels(action) => match action { + P2pChannelsEffectfulAction::SignalingDiscovery(action) => { + action.effects(&meta, store) + } + P2pChannelsEffectfulAction::SignalingExchange(action) => { + action.effects(&meta, store) + } P2pChannelsEffectfulAction::BestTip(action) => action.effects(&meta, store), P2pChannelsEffectfulAction::Transaction(action) => action.effects(&meta, store), P2pChannelsEffectfulAction::StreamingRpc(action) => action.effects(&meta, store), diff --git a/p2p/src/p2p_event.rs b/p2p/src/p2p_event.rs index f4d84a6249..b349831a1f 100644 --- a/p2p/src/p2p_event.rs +++ b/p2p/src/p2p_event.rs @@ -4,6 +4,8 @@ use std::net::{IpAddr, SocketAddr}; use derive_more::From; use serde::{Deserialize, Serialize}; +use crate::channels::signaling::discovery::SignalingDiscoveryChannelMsg; +use crate::channels::signaling::exchange::SignalingExchangeChannelMsg; use crate::channels::streaming_rpc::StreamingRpcChannelMsg; use crate::ConnectionAddr; use crate::{ @@ -105,6 +107,9 @@ impl fmt::Display for P2pConnectionEvent { P2pConnectionResponse::Rejected(reason) => { write!(f, "AnswerReceived, {peer_id}, Rejected, {reason:?}") } + P2pConnectionResponse::SignalDecryptionFailed => { + write!(f, "SignalDecryptionFailed, {peer_id}") + } P2pConnectionResponse::InternalError => { write!(f, "AnswerReceived, {peer_id}, InternalError") } @@ -148,6 +153,27 @@ impl fmt::Display for P2pChannelEvent { }; match msg { + ChannelMsg::SignalingDiscovery(v) => match v { + SignalingDiscoveryChannelMsg::GetNext => write!(f, "GetNext"), + SignalingDiscoveryChannelMsg::Discover => write!(f, "Discover"), + SignalingDiscoveryChannelMsg::Discovered { target_public_key } => { + write!(f, "Discovered, {}", target_public_key.peer_id()) + } + SignalingDiscoveryChannelMsg::DiscoveredReject => write!(f, "Discovered"), + SignalingDiscoveryChannelMsg::DiscoveredAccept(_) => { + write!(f, "DiscoveredAccept") + } + SignalingDiscoveryChannelMsg::Answer(_) => write!(f, "Answer"), + }, + ChannelMsg::SignalingExchange(v) => match v { + SignalingExchangeChannelMsg::GetNext => write!(f, "GetNext"), + SignalingExchangeChannelMsg::OfferToYou { + offerer_pub_key, .. + } => { + write!(f, "OfferToYou, {}", offerer_pub_key.peer_id()) + } + SignalingExchangeChannelMsg::Answer(_) => write!(f, "Answer"), + }, ChannelMsg::BestTipPropagation(v) => { match v { BestTipPropagationChannelMsg::GetNext => write!(f, "GetNext"), diff --git a/p2p/src/p2p_reducer.rs b/p2p/src/p2p_reducer.rs index 4b8775ea76..9cf1022202 100644 --- a/p2p/src/p2p_reducer.rs +++ b/p2p/src/p2p_reducer.rs @@ -1,6 +1,7 @@ use crate::{ channels::{ - rpc::P2pChannelsRpcAction, streaming_rpc::P2pChannelsStreamingRpcAction, P2pChannelsState, + rpc::P2pChannelsRpcAction, signaling::discovery::P2pChannelsSignalingDiscoveryAction, + streaming_rpc::P2pChannelsStreamingRpcAction, P2pChannelsState, }, connection::{ incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction, @@ -189,6 +190,14 @@ impl P2pState { return Ok(()); } + for (&peer_id, _) in self + .ready_peers_iter() + .filter(|(_, peer)| peer.channels.signaling.discovery.is_ready()) + { + dispatcher.push(P2pChannelsSignalingDiscoveryAction::RequestSend { peer_id }); + dispatcher.push(P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend { peer_id }); + } + if let Some(_d) = config.timeouts.initial_peers { // ask initial peers // TODO: use RPC to ask initial peers diff --git a/p2p/src/service_impl/mod.rs b/p2p/src/service_impl/mod.rs index 5cd808cbe1..f5d871677f 100644 --- a/p2p/src/service_impl/mod.rs +++ b/p2p/src/service_impl/mod.rs @@ -21,7 +21,7 @@ pub mod webrtc { use crate::{ channels::{ChannelId, ChannelMsg, MsgId}, connection::outgoing::P2pConnectionOutgoingInitOpts, - identity::SecretKey, + identity::{EncryptableType, PublicKey, SecretKey}, webrtc, P2pEvent, PeerId, }; @@ -72,5 +72,17 @@ pub mod webrtc { fn channel_open(&mut self, peer_id: PeerId, id: ChannelId) {} fn channel_send(&mut self, peer_id: PeerId, msg_id: MsgId, msg: ChannelMsg) {} + + fn encrypt( + &mut self, + other_pk: &PublicKey, + message: &T, + ) -> Result; + + fn decrypt( + &mut self, + other_pub_key: &PublicKey, + encrypted: &T::Encrypted, + ) -> Result; } } diff --git a/p2p/src/service_impl/webrtc/mod.rs b/p2p/src/service_impl/webrtc/mod.rs index 9a396a4745..9fea2a0300 100644 --- a/p2p/src/service_impl/webrtc/mod.rs +++ b/p2p/src/service_impl/webrtc/mod.rs @@ -17,6 +17,7 @@ use wasm_bindgen_futures::spawn_local; use openmina_core::channels::{mpsc, oneshot}; +use crate::identity::{EncryptableType, PublicKey}; use crate::{ channels::{ChannelId, ChannelMsg, MsgId}, connection::outgoing::P2pConnectionOutgoingInitOpts, @@ -183,13 +184,17 @@ impl Drop for RTCChannel { } } -async fn wait_for_ice_gathering_complete(pc: &RTCConnection) { - let timeout = Duration::from_secs(3); - +async fn sleep(dur: Duration) { #[cfg(not(target_arch = "wasm32"))] - let timeout = tokio::time::sleep(timeout); + let fut = tokio::time::sleep(dur); #[cfg(target_arch = "wasm32")] - let timeout = gloo_timers::future::TimeoutFuture::new(timeout.as_millis() as u32); + let fut = gloo_timers::future::TimeoutFuture::new(dur.as_millis() as u32); + fut.await +} + +async fn wait_for_ice_gathering_complete(pc: &RTCConnection) { + let timeout = sleep(Duration::from_secs(3)); + tokio::select! { _ = timeout => {} _ = pc.wait_for_ice_gathering_complete() => {} @@ -512,6 +517,11 @@ async fn peer_loop( let chan_clone = chan.clone(); let event_sender_clone = event_sender.clone(); spawn_local(async move { + // Add a delay for sending messages after channel + // was opened. Some initial messages get lost otherwise. + // TODO(binier): find deeper cause and fix it. + sleep(Duration::from_secs(3)).await; + while let Some((msg_id, encoded)) = sender_rx.recv().await { let encoded = bytes::Bytes::from(encoded); let mut chunks = @@ -714,4 +724,16 @@ pub trait P2pServiceWebrtc: redux::Service { let _ = peer.cmd_sender.send(PeerCmd::ChannelSend(msg_id, msg)); } } + + fn encrypt( + &mut self, + other_pk: &PublicKey, + message: &T, + ) -> Result; + + fn decrypt( + &mut self, + other_pub_key: &PublicKey, + encrypted: &T::Encrypted, + ) -> Result; } diff --git a/p2p/src/service_impl/webrtc_with_libp2p.rs b/p2p/src/service_impl/webrtc_with_libp2p.rs index 299e183452..a665eca348 100644 --- a/p2p/src/service_impl/webrtc_with_libp2p.rs +++ b/p2p/src/service_impl/webrtc_with_libp2p.rs @@ -149,6 +149,22 @@ impl P2pChannelsService for T { P2pServiceWebrtc::channel_send(self, peer_id, msg_id, msg) } } + + fn encrypt( + &mut self, + other_pk: &crate::identity::PublicKey, + message: &M, + ) -> Result { + P2pServiceWebrtc::encrypt(self, other_pk, message) + } + + fn decrypt( + &mut self, + other_pk: &crate::identity::PublicKey, + encrypted: &M::Encrypted, + ) -> Result { + P2pServiceWebrtc::decrypt(self, other_pk, encrypted) + } } #[cfg(feature = "p2p-libp2p")] diff --git a/p2p/src/webrtc/mod.rs b/p2p/src/webrtc/mod.rs index 3500ddfc63..8a38c6eeb1 100644 --- a/p2p/src/webrtc/mod.rs +++ b/p2p/src/webrtc/mod.rs @@ -2,7 +2,9 @@ mod host; pub use host::Host; mod signal; -pub use signal::{Answer, Offer, Signal}; +pub use signal::{ + Answer, EncryptedAnswer, EncryptedOffer, Offer, P2pConnectionResponse, RejectionReason, Signal, +}; mod signaling_method; pub use signaling_method::{HttpSignalingInfo, SignalingMethod, SignalingMethodParseError}; diff --git a/p2p/src/webrtc/signal.rs b/p2p/src/webrtc/signal.rs index 5cb2e96bdc..47fde57da6 100644 --- a/p2p/src/webrtc/signal.rs +++ b/p2p/src/webrtc/signal.rs @@ -1,7 +1,8 @@ +use binprot_derive::{BinProtRead, BinProtWrite}; use derive_more::From; use serde::{Deserialize, Serialize}; -use crate::identity::{PeerId, PublicKey}; +use crate::identity::{EncryptableType, PeerId, PublicKey}; use super::Host; @@ -33,3 +34,75 @@ pub enum Signal { Offer(Offer), Answer(Answer), } + +#[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone, Copy, thiserror::Error)] +pub enum RejectionReason { + #[error("peer_id does not match peer's public key")] + PeerIdAndPublicKeyMismatch, + #[error("target peer_id is not local node's peer_id")] + TargetPeerIdNotMe, + #[error("too many peers")] + PeerCapacityFull, + #[error("peer already connected")] + AlreadyConnected, + #[error("self connection detected")] + ConnectingToSelf, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum P2pConnectionResponse { + Accepted(Box), + Rejected(RejectionReason), + SignalDecryptionFailed, + InternalError, +} + +impl RejectionReason { + pub fn is_bad(&self) -> bool { + match self { + Self::PeerIdAndPublicKeyMismatch => true, + Self::TargetPeerIdNotMe => true, + Self::PeerCapacityFull => false, + Self::AlreadyConnected => true, + Self::ConnectingToSelf => false, + } + } +} + +impl P2pConnectionResponse { + pub fn internal_error_str() -> &'static str { + "InternalError" + } + + pub fn internal_error_json_str() -> &'static str { + "\"InternalError\"" + } +} + +/// Encrypted `webrtc::Offer`. +#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, From, Debug, Clone)] +pub struct EncryptedOffer(Vec); + +/// Encrypted `P2pConnectionResponse`. +#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, From, Debug, Clone)] +pub struct EncryptedAnswer(Vec); + +impl AsRef<[u8]> for EncryptedOffer { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +impl AsRef<[u8]> for EncryptedAnswer { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +impl EncryptableType for Offer { + type Encrypted = EncryptedOffer; +} + +impl EncryptableType for P2pConnectionResponse { + type Encrypted = EncryptedAnswer; +} diff --git a/p2p/src/webrtc/signaling_method/mod.rs b/p2p/src/webrtc/signaling_method/mod.rs index 45f9672657..ff09d5a7d8 100644 --- a/p2p/src/webrtc/signaling_method/mod.rs +++ b/p2p/src/webrtc/signaling_method/mod.rs @@ -7,26 +7,43 @@ use binprot_derive::{BinProtRead, BinProtWrite}; use serde::{Deserialize, Serialize}; use thiserror::Error; +use crate::PeerId; + #[derive(BinProtWrite, BinProtRead, Eq, PartialEq, Ord, PartialOrd, Debug, Clone)] pub enum SignalingMethod { Http(HttpSignalingInfo), Https(HttpSignalingInfo), + P2p { relay_peer_id: PeerId }, } impl SignalingMethod { + pub fn can_connect_directly(&self) -> bool { + match self { + Self::Http(_) | Self::Https(_) => true, + Self::P2p { .. } => false, + } + } + /// If method is http or https, it will return url to which an /// offer can be sent. pub fn http_url(&self) -> Option { let (http, info) = match self { Self::Http(info) => ("http", info), Self::Https(info) => ("https", info), - // _ => return None, + _ => return None, }; Some(format!( "{http}://{}:{}/mina/webrtc/signal", info.host, info.port, )) } + + pub fn p2p_relay_peer_id(&self) -> Option { + match self { + Self::P2p { relay_peer_id } => Some(*relay_peer_id), + _ => None, + } + } } impl fmt::Display for SignalingMethod { @@ -40,6 +57,9 @@ impl fmt::Display for SignalingMethod { write!(f, "/https")?; signaling.fmt(f) } + Self::P2p { relay_peer_id } => { + write!(f, "/p2p/{relay_peer_id}") + } } } } diff --git a/p2p/testing/src/redux.rs b/p2p/testing/src/redux.rs index 4ef1ce521b..86b386bf5c 100644 --- a/p2p/testing/src/redux.rs +++ b/p2p/testing/src/redux.rs @@ -12,9 +12,18 @@ use openmina_core::{ use p2p::{ bootstrap::P2pNetworkKadBootstrapState, channels::{ - best_tip::P2pChannelsBestTipAction, best_tip_effectful::P2pChannelsBestTipEffectfulAction, - rpc::P2pChannelsRpcAction, rpc_effectful::P2pChannelsRpcEffectfulAction, - snark::P2pChannelsSnarkAction, snark_effectful::P2pChannelsSnarkEffectfulAction, + best_tip::P2pChannelsBestTipAction, + best_tip_effectful::P2pChannelsBestTipEffectfulAction, + rpc::P2pChannelsRpcAction, + rpc_effectful::P2pChannelsRpcEffectfulAction, + signaling::{ + discovery::P2pChannelsSignalingDiscoveryAction, + discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction, + exchange::P2pChannelsSignalingExchangeAction, + exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction, + }, + snark::P2pChannelsSnarkAction, + snark_effectful::P2pChannelsSnarkEffectfulAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction, snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction, streaming_rpc::P2pChannelsStreamingRpcAction, @@ -286,6 +295,8 @@ impl_from_p2p!(p2p::P2pNetworkPnetAction); impl_from_p2p!(p2p::P2pNetworkNoiseAction); impl_from_p2p!(p2p::connection::incoming::P2pConnectionIncomingAction); impl_from_p2p!(p2p::P2pNetworkPubsubAction); +impl_from_p2p!(P2pChannelsSignalingDiscoveryAction); +impl_from_p2p!(P2pChannelsSignalingExchangeAction); impl_from_p2p!(P2pChannelsTransactionAction); impl_from_p2p!(P2pChannelsSnarkAction); impl_from_p2p!(p2p::P2pNetworkRpcAction); @@ -303,6 +314,8 @@ impl_from_p2p!(effectful p2p::P2pNetworkPubsubEffectfulAction); impl_from_p2p!(effectful P2pNetworkIdentifyStreamEffectfulAction); impl_from_p2p!(effectful P2pConnectionOutgoingEffectfulAction); impl_from_p2p!(effectful P2pDisconnectionEffectfulAction); +impl_from_p2p!(effectful P2pChannelsSignalingDiscoveryEffectfulAction); +impl_from_p2p!(effectful P2pChannelsSignalingExchangeEffectfulAction); impl_from_p2p!(effectful P2pChannelsBestTipEffectfulAction); impl_from_p2p!(effectful P2pChannelsStreamingRpcEffectfulAction); impl_from_p2p!(effectful P2pChannelsTransactionEffectfulAction); diff --git a/p2p/testing/src/service.rs b/p2p/testing/src/service.rs index 1237415723..67044f5f0f 100644 --- a/p2p/testing/src/service.rs +++ b/p2p/testing/src/service.rs @@ -99,6 +99,22 @@ impl P2pServiceWebrtc for ClusterService { ) -> &mut std::collections::BTreeMap { &mut self.peers } + + fn encrypt( + &mut self, + _other_pk: &p2p::identity::PublicKey, + _message: &T, + ) -> Result { + unreachable!("this is webrtc only and this crate tests libp2p only") + } + + fn decrypt( + &mut self, + _other_pub_key: &p2p::identity::PublicKey, + _encrypted: &T::Encrypted, + ) -> Result { + unreachable!("this is webrtc only and this crate tests libp2p only") + } } impl P2pCryptoService for ClusterService {