diff --git a/node/src/action.rs b/node/src/action.rs index 56c7d48394..3793d97058 100644 --- a/node/src/action.rs +++ b/node/src/action.rs @@ -113,6 +113,9 @@ impl redux::EnablingCondition for Action { impl From for Action { fn from(action: redux::AnyAction) -> Self { - *action.0.downcast::().expect("Downcast failed") + match action.0.downcast() { + Ok(action) => *action, + Err(action) => Self::P2p(*action.downcast().expect("Downcast failed")), + } } } diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index a74fbaebf2..31606dbc4d 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -29,21 +29,13 @@ use crate::ledger::LedgerAction; use crate::ledger_effectful::LedgerEffectfulAction; use crate::p2p::callbacks::P2pCallbacksAction; use crate::p2p::channels::best_tip::P2pChannelsBestTipAction; -use crate::p2p::channels::best_tip_effectful::P2pChannelsBestTipEffectfulAction; use crate::p2p::channels::rpc::P2pChannelsRpcAction; -use crate::p2p::channels::rpc_effectful::P2pChannelsRpcEffectfulAction; use crate::p2p::channels::signaling::discovery::P2pChannelsSignalingDiscoveryAction; -use crate::p2p::channels::signaling::discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction; use crate::p2p::channels::signaling::exchange::P2pChannelsSignalingExchangeAction; -use crate::p2p::channels::signaling::exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction; use crate::p2p::channels::snark::P2pChannelsSnarkAction; -use crate::p2p::channels::snark_effectful::P2pChannelsSnarkEffectfulAction; use crate::p2p::channels::snark_job_commitment::P2pChannelsSnarkJobCommitmentAction; -use crate::p2p::channels::snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction; use crate::p2p::channels::streaming_rpc::P2pChannelsStreamingRpcAction; -use crate::p2p::channels::streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction; use crate::p2p::channels::transaction::P2pChannelsTransactionAction; -use crate::p2p::channels::transaction_effectful::P2pChannelsTransactionEffectfulAction; use crate::p2p::channels::{ P2pChannelsAction, P2pChannelsEffectfulAction, P2pChannelsMessageReceivedAction, }; @@ -221,9 +213,12 @@ pub enum ActionKind { P2pChannelsBestTipRequestReceived, P2pChannelsBestTipRequestSend, P2pChannelsBestTipResponseSend, - P2pChannelsBestTipEffectfulInit, - P2pChannelsBestTipEffectfulRequestSend, - P2pChannelsBestTipEffectfulResponseSend, + P2pChannelsEffectfulInitChannel, + P2pChannelsEffectfulMessageSend, + P2pChannelsEffectfulSignalingDiscoveryAnswerDecrypt, + P2pChannelsEffectfulSignalingDiscoveryOfferEncryptAndSend, + P2pChannelsEffectfulSignalingExchangeAnswerEncryptAndSend, + P2pChannelsEffectfulSignalingExchangeOfferDecrypt, P2pChannelsMessageReceived, P2pChannelsRpcInit, P2pChannelsRpcPending, @@ -234,9 +229,6 @@ pub enum ActionKind { P2pChannelsRpcResponseReceived, P2pChannelsRpcResponseSend, P2pChannelsRpcTimeout, - P2pChannelsRpcEffectfulInit, - P2pChannelsRpcEffectfulRequestSend, - P2pChannelsRpcEffectfulResponseSend, P2pChannelsSignalingDiscoveryAnswerDecrypted, P2pChannelsSignalingDiscoveryAnswerReceived, P2pChannelsSignalingDiscoveryAnswerSend, @@ -253,10 +245,6 @@ pub enum ActionKind { P2pChannelsSignalingDiscoveryReady, P2pChannelsSignalingDiscoveryRequestReceived, P2pChannelsSignalingDiscoveryRequestSend, - P2pChannelsSignalingDiscoveryEffectfulAnswerDecrypt, - P2pChannelsSignalingDiscoveryEffectfulInit, - P2pChannelsSignalingDiscoveryEffectfulMessageSend, - P2pChannelsSignalingDiscoveryEffectfulOfferEncryptAndSend, P2pChannelsSignalingExchangeAnswerReceived, P2pChannelsSignalingExchangeAnswerSend, P2pChannelsSignalingExchangeInit, @@ -268,10 +256,6 @@ pub enum ActionKind { P2pChannelsSignalingExchangeReady, P2pChannelsSignalingExchangeRequestReceived, P2pChannelsSignalingExchangeRequestSend, - P2pChannelsSignalingExchangeEffectfulAnswerEncryptAndSend, - P2pChannelsSignalingExchangeEffectfulInit, - P2pChannelsSignalingExchangeEffectfulMessageSend, - P2pChannelsSignalingExchangeEffectfulOfferDecrypt, P2pChannelsSnarkInit, P2pChannelsSnarkLibp2pBroadcast, P2pChannelsSnarkLibp2pReceived, @@ -282,9 +266,6 @@ pub enum ActionKind { P2pChannelsSnarkRequestReceived, P2pChannelsSnarkRequestSend, P2pChannelsSnarkResponseSend, - P2pChannelsSnarkEffectfulInit, - P2pChannelsSnarkEffectfulRequestSend, - P2pChannelsSnarkEffectfulResponseSend, P2pChannelsSnarkJobCommitmentInit, P2pChannelsSnarkJobCommitmentPending, P2pChannelsSnarkJobCommitmentPromiseReceived, @@ -293,9 +274,6 @@ pub enum ActionKind { P2pChannelsSnarkJobCommitmentRequestReceived, P2pChannelsSnarkJobCommitmentRequestSend, P2pChannelsSnarkJobCommitmentResponseSend, - P2pChannelsSnarkJobCommitmentEffectfulInit, - P2pChannelsSnarkJobCommitmentEffectfulRequestSend, - P2pChannelsSnarkJobCommitmentEffectfulResponseSend, P2pChannelsStreamingRpcInit, P2pChannelsStreamingRpcPending, P2pChannelsStreamingRpcReady, @@ -310,11 +288,6 @@ pub enum ActionKind { P2pChannelsStreamingRpcResponseSendInit, P2pChannelsStreamingRpcResponseSent, P2pChannelsStreamingRpcTimeout, - P2pChannelsStreamingRpcEffectfulInit, - P2pChannelsStreamingRpcEffectfulRequestSend, - P2pChannelsStreamingRpcEffectfulResponseNextPartGet, - P2pChannelsStreamingRpcEffectfulResponsePartSend, - P2pChannelsStreamingRpcEffectfulResponseSendInit, P2pChannelsTransactionInit, P2pChannelsTransactionLibp2pBroadcast, P2pChannelsTransactionLibp2pReceived, @@ -325,9 +298,6 @@ pub enum ActionKind { P2pChannelsTransactionRequestReceived, P2pChannelsTransactionRequestSend, P2pChannelsTransactionResponseSend, - P2pChannelsTransactionEffectfulInit, - P2pChannelsTransactionEffectfulRequestSend, - P2pChannelsTransactionEffectfulResponseSend, P2pConnectionIncomingAnswerReady, P2pConnectionIncomingAnswerSdpCreateError, P2pConnectionIncomingAnswerSdpCreatePending, @@ -718,7 +688,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 601; + pub const COUNT: u16 = 579; } impl std::fmt::Display for ActionKind { @@ -1284,14 +1254,20 @@ impl ActionKindGet for P2pNetworkAction { impl ActionKindGet for P2pChannelsEffectfulAction { fn kind(&self) -> ActionKind { match self { - Self::SignalingDiscovery(a) => a.kind(), - Self::SignalingExchange(a) => a.kind(), - Self::BestTip(a) => a.kind(), - Self::Rpc(a) => a.kind(), - Self::Snark(a) => a.kind(), - Self::SnarkJobCommitment(a) => a.kind(), - Self::StreamingRpc(a) => a.kind(), - Self::Transaction(a) => a.kind(), + Self::InitChannel { .. } => ActionKind::P2pChannelsEffectfulInitChannel, + Self::MessageSend { .. } => ActionKind::P2pChannelsEffectfulMessageSend, + Self::SignalingDiscoveryAnswerDecrypt { .. } => { + ActionKind::P2pChannelsEffectfulSignalingDiscoveryAnswerDecrypt + } + Self::SignalingDiscoveryOfferEncryptAndSend { .. } => { + ActionKind::P2pChannelsEffectfulSignalingDiscoveryOfferEncryptAndSend + } + Self::SignalingExchangeOfferDecrypt { .. } => { + ActionKind::P2pChannelsEffectfulSignalingExchangeOfferDecrypt + } + Self::SignalingExchangeAnswerEncryptAndSend { .. } => { + ActionKind::P2pChannelsEffectfulSignalingExchangeAnswerEncryptAndSend + } } } } @@ -1946,112 +1922,6 @@ impl ActionKindGet for P2pNetworkRpcAction { } } -impl ActionKindGet for P2pChannelsSignalingDiscoveryEffectfulAction { - fn kind(&self) -> ActionKind { - match self { - Self::Init { .. } => ActionKind::P2pChannelsSignalingDiscoveryEffectfulInit, - Self::MessageSend { .. } => { - ActionKind::P2pChannelsSignalingDiscoveryEffectfulMessageSend - } - Self::OfferEncryptAndSend { .. } => { - ActionKind::P2pChannelsSignalingDiscoveryEffectfulOfferEncryptAndSend - } - Self::AnswerDecrypt { .. } => { - ActionKind::P2pChannelsSignalingDiscoveryEffectfulAnswerDecrypt - } - } - } -} - -impl ActionKindGet for P2pChannelsSignalingExchangeEffectfulAction { - fn kind(&self) -> ActionKind { - match self { - Self::Init { .. } => ActionKind::P2pChannelsSignalingExchangeEffectfulInit, - Self::MessageSend { .. } => { - ActionKind::P2pChannelsSignalingExchangeEffectfulMessageSend - } - Self::OfferDecrypt { .. } => { - ActionKind::P2pChannelsSignalingExchangeEffectfulOfferDecrypt - } - Self::AnswerEncryptAndSend { .. } => { - ActionKind::P2pChannelsSignalingExchangeEffectfulAnswerEncryptAndSend - } - } - } -} - -impl ActionKindGet for P2pChannelsBestTipEffectfulAction { - fn kind(&self) -> ActionKind { - match self { - Self::Init { .. } => ActionKind::P2pChannelsBestTipEffectfulInit, - Self::RequestSend { .. } => ActionKind::P2pChannelsBestTipEffectfulRequestSend, - Self::ResponseSend { .. } => ActionKind::P2pChannelsBestTipEffectfulResponseSend, - } - } -} - -impl ActionKindGet for P2pChannelsRpcEffectfulAction { - fn kind(&self) -> ActionKind { - match self { - Self::Init { .. } => ActionKind::P2pChannelsRpcEffectfulInit, - Self::RequestSend { .. } => ActionKind::P2pChannelsRpcEffectfulRequestSend, - Self::ResponseSend { .. } => ActionKind::P2pChannelsRpcEffectfulResponseSend, - } - } -} - -impl ActionKindGet for P2pChannelsSnarkEffectfulAction { - fn kind(&self) -> ActionKind { - match self { - Self::Init { .. } => ActionKind::P2pChannelsSnarkEffectfulInit, - Self::RequestSend { .. } => ActionKind::P2pChannelsSnarkEffectfulRequestSend, - Self::ResponseSend { .. } => ActionKind::P2pChannelsSnarkEffectfulResponseSend, - } - } -} - -impl ActionKindGet for P2pChannelsSnarkJobCommitmentEffectfulAction { - fn kind(&self) -> ActionKind { - match self { - Self::Init { .. } => ActionKind::P2pChannelsSnarkJobCommitmentEffectfulInit, - Self::RequestSend { .. } => { - ActionKind::P2pChannelsSnarkJobCommitmentEffectfulRequestSend - } - Self::ResponseSend { .. } => { - ActionKind::P2pChannelsSnarkJobCommitmentEffectfulResponseSend - } - } - } -} - -impl ActionKindGet for P2pChannelsStreamingRpcEffectfulAction { - fn kind(&self) -> ActionKind { - match self { - Self::Init { .. } => ActionKind::P2pChannelsStreamingRpcEffectfulInit, - Self::RequestSend { .. } => ActionKind::P2pChannelsStreamingRpcEffectfulRequestSend, - Self::ResponseNextPartGet { .. } => { - ActionKind::P2pChannelsStreamingRpcEffectfulResponseNextPartGet - } - Self::ResponseSendInit { .. } => { - ActionKind::P2pChannelsStreamingRpcEffectfulResponseSendInit - } - Self::ResponsePartSend { .. } => { - ActionKind::P2pChannelsStreamingRpcEffectfulResponsePartSend - } - } - } -} - -impl ActionKindGet for P2pChannelsTransactionEffectfulAction { - fn kind(&self) -> ActionKind { - match self { - Self::Init { .. } => ActionKind::P2pChannelsTransactionEffectfulInit, - Self::RequestSend { .. } => ActionKind::P2pChannelsTransactionEffectfulRequestSend, - Self::ResponseSend { .. } => ActionKind::P2pChannelsTransactionEffectfulResponseSend, - } - } -} - impl ActionKindGet for P2pConnectionOutgoingEffectfulAction { fn kind(&self) -> ActionKind { match self { diff --git a/node/src/logger/logger_effects.rs b/node/src/logger/logger_effects.rs index 7f7a7d939a..8e60710c26 100644 --- a/node/src/logger/logger_effects.rs +++ b/node/src/logger/logger_effects.rs @@ -1,7 +1,6 @@ use openmina_core::log::inner::field::{display, DisplayValue}; use openmina_core::log::inner::Value; use openmina_core::log::{time_to_str, ActionEvent, EventContext}; -use p2p::channels::P2pChannelsEffectfulAction; use p2p::connection::P2pConnectionEffectfulAction; use p2p::{P2pNetworkConnectionError, P2pNetworkSchedulerAction, PeerId}; @@ -106,22 +105,7 @@ pub fn logger_effects(store: &Store, action: ActionWithMetaRef<'_ }, }, Action::P2pEffectful(action) => match action { - p2p::P2pEffectfulAction::Channels(action) => match action { - P2pChannelsEffectfulAction::SignalingDiscovery(action) => { - action.action_event(&context) - } - P2pChannelsEffectfulAction::SignalingExchange(action) => { - action.action_event(&context) - } - P2pChannelsEffectfulAction::BestTip(action) => action.action_event(&context), - P2pChannelsEffectfulAction::Rpc(action) => action.action_event(&context), - P2pChannelsEffectfulAction::StreamingRpc(action) => action.action_event(&context), - P2pChannelsEffectfulAction::SnarkJobCommitment(action) => { - action.action_event(&context) - } - P2pChannelsEffectfulAction::Snark(action) => action.action_event(&context), - P2pChannelsEffectfulAction::Transaction(action) => action.action_event(&context), - }, + p2p::P2pEffectfulAction::Channels(action) => action.action_event(&context), p2p::P2pEffectfulAction::Connection(action) => match action { P2pConnectionEffectfulAction::Outgoing(action) => action.action_event(&context), P2pConnectionEffectfulAction::Incoming(action) => action.action_event(&context), diff --git a/node/src/p2p/mod.rs b/node/src/p2p/mod.rs index 06d72936dc..991f4242c5 100644 --- a/node/src/p2p/mod.rs +++ b/node/src/p2p/mod.rs @@ -1,17 +1,6 @@ pub use ::p2p::*; use p2p::{ - channels::{ - best_tip_effectful::P2pChannelsBestTipEffectfulAction, - rpc_effectful::P2pChannelsRpcEffectfulAction, - signaling::{ - discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction, - exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction, - }, - snark_effectful::P2pChannelsSnarkEffectfulAction, - snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction, - streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction, - transaction_effectful::P2pChannelsTransactionEffectfulAction, - }, + channels::P2pChannelsEffectfulAction, network::identify::stream_effectful::P2pNetworkIdentifyStreamEffectfulAction, }; @@ -138,17 +127,8 @@ impl_into_global_action!(effectful p2p::P2pNetworkPnetEffectfulAction); impl_into_global_action!(effectful connection::incoming_effectful::P2pConnectionIncomingEffectfulAction); impl_into_global_action!(effectful connection::outgoing_effectful::P2pConnectionOutgoingEffectfulAction); impl_into_global_action!(effectful p2p::disconnection_effectful::P2pDisconnectionEffectfulAction); -impl_into_global_action!( - effectful P2pChannelsSignalingDiscoveryEffectfulAction -); -impl_into_global_action!(effectful P2pChannelsSignalingExchangeEffectfulAction); -impl_into_global_action!(effectful P2pChannelsBestTipEffectfulAction); -impl_into_global_action!(effectful P2pChannelsStreamingRpcEffectfulAction); -impl_into_global_action!(effectful P2pChannelsTransactionEffectfulAction); -impl_into_global_action!(effectful P2pChannelsSnarkJobCommitmentEffectfulAction); -impl_into_global_action!(effectful P2pChannelsRpcEffectfulAction); -impl_into_global_action!(effectful P2pChannelsSnarkEffectfulAction); impl_into_global_action!(effectful network::pubsub::P2pNetworkPubsubEffectfulAction); impl_into_global_action!(effectful P2pNetworkIdentifyStreamEffectfulAction); +impl_into_global_action!(effectful P2pChannelsEffectfulAction); impl p2p::P2pActionTrait for crate::Action {} diff --git a/p2p/src/channels/best_tip/p2p_channels_best_tip_reducer.rs b/p2p/src/channels/best_tip/p2p_channels_best_tip_reducer.rs index c4bdf794d5..c3a91bf759 100644 --- a/p2p/src/channels/best_tip/p2p_channels_best_tip_reducer.rs +++ b/p2p/src/channels/best_tip/p2p_channels_best_tip_reducer.rs @@ -2,11 +2,14 @@ use openmina_core::{bug_condition, Substate}; use redux::ActionWithMeta; use crate::{ - channels::best_tip_effectful::P2pChannelsBestTipEffectfulAction, P2pNetworkPubsubAction, - P2pPeerAction, P2pState, + channels::{ChannelId, ChannelMsg, MsgId, P2pChannelsEffectfulAction}, + P2pNetworkPubsubAction, P2pPeerAction, P2pState, PeerId, }; -use super::{BestTipPropagationState, P2pChannelsBestTipAction, P2pChannelsBestTipState}; +use super::{ + BestTipPropagationChannelMsg, BestTipPropagationState, P2pChannelsBestTipAction, + P2pChannelsBestTipState, +}; impl P2pChannelsBestTipState { /// Substate is accessed @@ -33,7 +36,16 @@ impl P2pChannelsBestTipState { *best_tip_state = Self::Init { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsBestTipEffectfulAction::Init { peer_id }); + + dispatcher.push(P2pChannelsEffectfulAction::InitChannel { + peer_id, + id: ChannelId::BestTipPropagation, + on_success: redux::callback!( + on_best_tip_channel_init(peer_id: PeerId) -> crate::P2pAction { + P2pChannelsBestTipAction::Pending { peer_id } + } + ), + }); Ok(()) } P2pChannelsBestTipAction::Pending { .. } => { @@ -69,7 +81,11 @@ impl P2pChannelsBestTipState { *local = BestTipPropagationState::Requested { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsBestTipEffectfulAction::RequestSend { peer_id }); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { + peer_id, + msg_id: MsgId::first(), + msg: ChannelMsg::BestTipPropagation(BestTipPropagationChannelMsg::GetNext), + }); Ok(()) } P2pChannelsBestTipAction::Received { best_tip, .. } => { @@ -136,9 +152,12 @@ impl P2pChannelsBestTipState { let dispatcher = state_context.into_dispatcher(); if !is_libp2p { - dispatcher.push(P2pChannelsBestTipEffectfulAction::ResponseSend { + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - best_tip, + msg_id: MsgId::first(), + msg: ChannelMsg::BestTipPropagation(BestTipPropagationChannelMsg::BestTip( + best_tip.block, + )), }); return Ok(()); } diff --git a/p2p/src/channels/best_tip_effectful/mod.rs b/p2p/src/channels/best_tip_effectful/mod.rs deleted file mode 100644 index 010e97c36c..0000000000 --- a/p2p/src/channels/best_tip_effectful/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod p2p_channels_best_tip_effectful_actions; -pub use p2p_channels_best_tip_effectful_actions::P2pChannelsBestTipEffectfulAction; - -mod p2p_channels_best_tip_effectful_effects; diff --git a/p2p/src/channels/best_tip_effectful/p2p_channels_best_tip_effectful_actions.rs b/p2p/src/channels/best_tip_effectful/p2p_channels_best_tip_effectful_actions.rs deleted file mode 100644 index efb08ca2fa..0000000000 --- a/p2p/src/channels/best_tip_effectful/p2p_channels_best_tip_effectful_actions.rs +++ /dev/null @@ -1,31 +0,0 @@ -use openmina_core::{block::ArcBlockWithHash, ActionEvent}; -use serde::{Deserialize, Serialize}; - -use crate::{channels::P2pChannelsEffectfulAction, P2pState, PeerId}; - -#[derive(Debug, Clone, Serialize, Deserialize, ActionEvent)] -#[action_event(fields(display(peer_id), best_tip = display(&best_tip.hash)))] -pub enum P2pChannelsBestTipEffectfulAction { - Init { - peer_id: PeerId, - }, - RequestSend { - peer_id: PeerId, - }, - ResponseSend { - peer_id: PeerId, - best_tip: ArcBlockWithHash, - }, -} - -impl redux::EnablingCondition for P2pChannelsBestTipEffectfulAction { - fn is_enabled(&self, _state: &P2pState, _time: redux::Timestamp) -> bool { - true - } -} - -impl From for crate::P2pEffectfulAction { - fn from(action: P2pChannelsBestTipEffectfulAction) -> crate::P2pEffectfulAction { - crate::P2pEffectfulAction::Channels(P2pChannelsEffectfulAction::BestTip(action)) - } -} diff --git a/p2p/src/channels/best_tip_effectful/p2p_channels_best_tip_effectful_effects.rs b/p2p/src/channels/best_tip_effectful/p2p_channels_best_tip_effectful_effects.rs deleted file mode 100644 index 76fc9f3451..0000000000 --- a/p2p/src/channels/best_tip_effectful/p2p_channels_best_tip_effectful_effects.rs +++ /dev/null @@ -1,35 +0,0 @@ -use super::P2pChannelsBestTipEffectfulAction; -use crate::channels::{ - best_tip::{BestTipPropagationChannelMsg, P2pChannelsBestTipAction}, - ChannelId, MsgId, P2pChannelsService, -}; -use redux::ActionMeta; - -impl P2pChannelsBestTipEffectfulAction { - pub fn effects(self, _meta: &ActionMeta, store: &mut Store) - where - Store: crate::P2pStore, - Store::Service: P2pChannelsService, - { - match self { - P2pChannelsBestTipEffectfulAction::Init { peer_id } => { - store - .service() - .channel_open(peer_id, ChannelId::BestTipPropagation); - store.dispatch(P2pChannelsBestTipAction::Pending { peer_id }); - } - P2pChannelsBestTipEffectfulAction::RequestSend { peer_id } => { - let msg = BestTipPropagationChannelMsg::GetNext; - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - } - P2pChannelsBestTipEffectfulAction::ResponseSend { peer_id, best_tip } => { - let msg = BestTipPropagationChannelMsg::BestTip(best_tip.block); - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - } - } - } -} diff --git a/p2p/src/channels/mod.rs b/p2p/src/channels/mod.rs index 78b35cf3b3..a66b92323b 100644 --- a/p2p/src/channels/mod.rs +++ b/p2p/src/channels/mod.rs @@ -1,16 +1,10 @@ pub mod best_tip; -pub mod best_tip_effectful; pub mod rpc; -pub mod rpc_effectful; pub mod signaling; pub mod snark; -pub mod snark_effectful; pub mod snark_job_commitment; -pub mod snark_job_commitment_effectful; pub mod streaming_rpc; -pub mod streaming_rpc_effectful; pub mod transaction; -pub mod transaction_effectful; mod p2p_channels_state; pub use p2p_channels_state::*; @@ -23,6 +17,8 @@ mod p2p_channels_reducer; mod p2p_channels_service; pub use p2p_channels_service::*; +mod p2p_channels_effectful_effects; + use binprot::{BinProtRead, BinProtWrite}; use binprot_derive::{BinProtRead, BinProtWrite}; use derive_more::From; diff --git a/p2p/src/channels/p2p_channels_actions.rs b/p2p/src/channels/p2p_channels_actions.rs index 30e9ec7065..d456bce283 100644 --- a/p2p/src/channels/p2p_channels_actions.rs +++ b/p2p/src/channels/p2p_channels_actions.rs @@ -1,28 +1,25 @@ use openmina_core::log::ActionEvent; +use redux::Callback; use serde::{Deserialize, Serialize}; -use crate::{P2pState, PeerId}; +use crate::{ + identity::PublicKey, + webrtc::{EncryptedAnswer, EncryptedOffer, Offer, P2pConnectionResponse}, + P2pState, PeerId, +}; use super::{ best_tip::P2pChannelsBestTipAction, - best_tip_effectful::P2pChannelsBestTipEffectfulAction, rpc::P2pChannelsRpcAction, - rpc_effectful::P2pChannelsRpcEffectfulAction, signaling::{ discovery::P2pChannelsSignalingDiscoveryAction, - discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction, exchange::P2pChannelsSignalingExchangeAction, - exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction, }, snark::P2pChannelsSnarkAction, - snark_effectful::P2pChannelsSnarkEffectfulAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction, - snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction, streaming_rpc::P2pChannelsStreamingRpcAction, - streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction, transaction::P2pChannelsTransactionAction, - transaction_effectful::P2pChannelsTransactionEffectfulAction, - ChannelMsg, + ChannelId, ChannelMsg, MsgId, }; #[derive(Serialize, Deserialize, Debug, Clone, openmina_core::ActionEvent)] @@ -40,14 +37,36 @@ pub enum P2pChannelsAction { #[derive(Serialize, Deserialize, Debug, Clone, openmina_core::ActionEvent)] pub enum P2pChannelsEffectfulAction { - SignalingDiscovery(P2pChannelsSignalingDiscoveryEffectfulAction), - SignalingExchange(P2pChannelsSignalingExchangeEffectfulAction), - BestTip(P2pChannelsBestTipEffectfulAction), - Rpc(P2pChannelsRpcEffectfulAction), - Snark(P2pChannelsSnarkEffectfulAction), - SnarkJobCommitment(P2pChannelsSnarkJobCommitmentEffectfulAction), - StreamingRpc(P2pChannelsStreamingRpcEffectfulAction), - Transaction(P2pChannelsTransactionEffectfulAction), + InitChannel { + peer_id: PeerId, + id: ChannelId, + on_success: Callback, + }, + MessageSend { + peer_id: PeerId, + msg_id: MsgId, + msg: ChannelMsg, + }, + SignalingDiscoveryAnswerDecrypt { + peer_id: PeerId, + pub_key: PublicKey, + answer: EncryptedAnswer, + }, + SignalingDiscoveryOfferEncryptAndSend { + peer_id: PeerId, + pub_key: PublicKey, + offer: Box, + }, + SignalingExchangeOfferDecrypt { + peer_id: PeerId, + pub_key: PublicKey, + offer: EncryptedOffer, + }, + SignalingExchangeAnswerEncryptAndSend { + peer_id: PeerId, + pub_key: PublicKey, + answer: Option, + }, } impl P2pChannelsAction { @@ -83,17 +102,8 @@ impl redux::EnablingCondition for P2pChannelsAction { } impl redux::EnablingCondition for P2pChannelsEffectfulAction { - fn is_enabled(&self, state: &crate::P2pState, time: redux::Timestamp) -> bool { - match self { - P2pChannelsEffectfulAction::SignalingDiscovery(a) => a.is_enabled(state, time), - P2pChannelsEffectfulAction::SignalingExchange(a) => a.is_enabled(state, time), - P2pChannelsEffectfulAction::BestTip(a) => a.is_enabled(state, time), - P2pChannelsEffectfulAction::Transaction(a) => a.is_enabled(state, time), - P2pChannelsEffectfulAction::StreamingRpc(a) => a.is_enabled(state, time), - P2pChannelsEffectfulAction::SnarkJobCommitment(a) => a.is_enabled(state, time), - P2pChannelsEffectfulAction::Rpc(a) => a.is_enabled(state, time), - P2pChannelsEffectfulAction::Snark(a) => a.is_enabled(state, time), - } + fn is_enabled(&self, _state: &crate::P2pState, _time: redux::Timestamp) -> bool { + true } } diff --git a/p2p/src/channels/p2p_channels_effectful_effects.rs b/p2p/src/channels/p2p_channels_effectful_effects.rs new file mode 100644 index 0000000000..275157e419 --- /dev/null +++ b/p2p/src/channels/p2p_channels_effectful_effects.rs @@ -0,0 +1,134 @@ +use openmina_core::bug_condition; +use redux::ActionMeta; + +use crate::webrtc::{Offer, P2pConnectionResponse}; + +use super::{ + signaling::{ + discovery::{P2pChannelsSignalingDiscoveryAction, SignalingDiscoveryChannelMsg}, + exchange::{P2pChannelsSignalingExchangeAction, SignalingExchangeChannelMsg}, + }, + ChannelMsg, MsgId, P2pChannelsEffectfulAction, P2pChannelsService, +}; + +impl P2pChannelsEffectfulAction { + pub fn effects(self, meta: &ActionMeta, store: &mut Store) + where + Store: crate::P2pStore, + Store::Service: P2pChannelsService, + { + match self { + P2pChannelsEffectfulAction::InitChannel { + peer_id, + id, + on_success, + } => { + store.service().channel_open(peer_id, id); + store.dispatch_callback(on_success, peer_id); + } + P2pChannelsEffectfulAction::MessageSend { + peer_id, + msg_id, + msg, + } => { + store.service().channel_send(peer_id, msg_id, msg); + } + P2pChannelsEffectfulAction::SignalingDiscoveryAnswerDecrypt { + peer_id, + pub_key, + answer, + } => { + match store + .service() + .decrypt::(&pub_key, &answer) + { + Err(_) => { + store.dispatch(P2pChannelsSignalingDiscoveryAction::AnswerDecrypted { + peer_id, + answer: P2pConnectionResponse::SignalDecryptionFailed, + }); + } + Ok(answer) => { + store.dispatch(P2pChannelsSignalingDiscoveryAction::AnswerDecrypted { + peer_id, + answer, + }); + } + } + } + P2pChannelsEffectfulAction::SignalingDiscoveryOfferEncryptAndSend { + peer_id, + pub_key, + offer, + } => match store.service().encrypt(&pub_key, offer.as_ref()) { + Err(_) => { + // TODO: handle + openmina_core::error!( + meta.time(); + summary = "Failed to encrypt webrtc offer", + peer_id = peer_id.to_string() + ); + } + Ok(offer) => { + let message = SignalingDiscoveryChannelMsg::DiscoveredAccept(offer); + store + .service() + .channel_send(peer_id, super::MsgId::first(), message.into()); + } + }, + P2pChannelsEffectfulAction::SignalingExchangeOfferDecrypt { + peer_id, + pub_key, + offer, + } => { + match store.service().decrypt::(&pub_key, &offer) { + Err(_) => { + store.dispatch(P2pChannelsSignalingExchangeAction::OfferDecryptError { + peer_id, + }); + } + Ok(offer) if offer.identity_pub_key != pub_key => { + // TODO(binier): propagate specific error. + // This is invalid behavior either from relayer or offerer. + store.dispatch(P2pChannelsSignalingExchangeAction::OfferDecryptError { + peer_id, + }); + } + Ok(offer) => { + store.dispatch(P2pChannelsSignalingExchangeAction::OfferDecryptSuccess { + peer_id, + offer, + }); + } + } + } + P2pChannelsEffectfulAction::SignalingExchangeAnswerEncryptAndSend { + peer_id, + pub_key, + answer, + } => { + let Some(answer) = answer else { + let message = SignalingExchangeChannelMsg::Answer(None); + store.service().channel_send( + peer_id, + MsgId::first(), + ChannelMsg::SignalingExchange(message), + ); + return; + }; + + match store.service().encrypt(&pub_key, &answer) { + Err(_) => bug_condition!("Failed to encrypt webrtc answer. Shouldn't happen since we managed to decrypt sent offer."), + Ok(answer) => { + let message = SignalingExchangeChannelMsg::Answer(Some(answer)); + store.service().channel_send( + peer_id, + MsgId::first(), + ChannelMsg::SignalingExchange(message), + ); + } + } + } + } + } +} diff --git a/p2p/src/channels/rpc/p2p_channels_rpc_reducer.rs b/p2p/src/channels/rpc/p2p_channels_rpc_reducer.rs index 04d4fa5cfe..62d93a34c9 100644 --- a/p2p/src/channels/rpc/p2p_channels_rpc_reducer.rs +++ b/p2p/src/channels/rpc/p2p_channels_rpc_reducer.rs @@ -1,17 +1,14 @@ -use std::collections::VecDeque; - -use openmina_core::{block::BlockWithHash, bug_condition, error, Substate}; -use redux::ActionWithMeta; - -use crate::{ - channels::rpc_effectful::P2pChannelsRpcEffectfulAction, P2pNetworkRpcAction, P2pPeerAction, - P2pState, -}; - use super::{ P2pChannelsRpcAction, P2pChannelsRpcState, P2pRpcLocalState, P2pRpcRemotePendingRequestState, - P2pRpcRemoteState, P2pRpcResponse, MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS, + P2pRpcRemoteState, P2pRpcResponse, RpcChannelMsg, MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS, }; +use crate::{ + channels::{ChannelId, ChannelMsg, MsgId, P2pChannelsEffectfulAction}, + P2pNetworkRpcAction, P2pPeerAction, P2pState, +}; +use openmina_core::{block::BlockWithHash, bug_condition, error, Substate}; +use redux::ActionWithMeta; +use std::collections::VecDeque; impl P2pChannelsRpcState { /// Substate is accessed @@ -40,7 +37,16 @@ impl P2pChannelsRpcState { *rpc_state = Self::Init { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsRpcEffectfulAction::Init { peer_id }); + + dispatcher.push(P2pChannelsEffectfulAction::InitChannel { + peer_id, + id: ChannelId::Rpc, + on_success: redux::callback!( + on_rpc_channel_init(peer_id: crate::PeerId) -> crate::P2pAction { + P2pChannelsRpcAction::Pending { peer_id } + } + ), + }); Ok(()) } P2pChannelsRpcAction::Pending { .. } => { @@ -107,12 +113,15 @@ impl P2pChannelsRpcState { return Ok(()); } - dispatcher.push(P2pChannelsRpcEffectfulAction::RequestSend { + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - id, - request, - on_init, + msg_id: MsgId::first(), + msg: ChannelMsg::Rpc(RpcChannelMsg::Request(id, *request.clone())), }); + + if let Some(callback) = on_init { + dispatcher.push_callback(callback, (peer_id, id, *request)); + } Ok(()) } P2pChannelsRpcAction::Timeout { id, .. } => { @@ -238,10 +247,10 @@ impl P2pChannelsRpcState { return Ok(()); } - dispatcher.push(P2pChannelsRpcEffectfulAction::ResponseSend { + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - id, - response, + msg_id: MsgId::first(), + msg: ChannelMsg::Rpc(RpcChannelMsg::Response(id, response.map(|v| *v))), }); Ok(()) } diff --git a/p2p/src/channels/rpc_effectful/mod.rs b/p2p/src/channels/rpc_effectful/mod.rs deleted file mode 100644 index 302be5eef2..0000000000 --- a/p2p/src/channels/rpc_effectful/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod p2p_channels_rpc_effectful_actions; -pub use p2p_channels_rpc_effectful_actions::P2pChannelsRpcEffectfulAction; -mod p2p_channels_rpc_effectful_effects; diff --git a/p2p/src/channels/rpc_effectful/p2p_channels_rpc_effectful_actions.rs b/p2p/src/channels/rpc_effectful/p2p_channels_rpc_effectful_actions.rs deleted file mode 100644 index 57fb90b0f8..0000000000 --- a/p2p/src/channels/rpc_effectful/p2p_channels_rpc_effectful_actions.rs +++ /dev/null @@ -1,42 +0,0 @@ -use openmina_core::ActionEvent; -use redux::Timestamp; -use serde::{Deserialize, Serialize}; - -use crate::{ - channels::{ - rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse}, - P2pChannelsEffectfulAction, - }, - P2pState, PeerId, -}; - -#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)] -#[action_event(fields(display(peer_id)))] -pub enum P2pChannelsRpcEffectfulAction { - Init { - peer_id: PeerId, - }, - RequestSend { - peer_id: PeerId, - id: P2pRpcId, - request: Box, - on_init: Option>, - }, - ResponseSend { - peer_id: PeerId, - id: P2pRpcId, - response: Option>, - }, -} - -impl redux::EnablingCondition for P2pChannelsRpcEffectfulAction { - fn is_enabled(&self, _state: &P2pState, _time: Timestamp) -> bool { - true - } -} - -impl From for crate::P2pEffectfulAction { - fn from(a: P2pChannelsRpcEffectfulAction) -> crate::P2pEffectfulAction { - crate::P2pEffectfulAction::Channels(P2pChannelsEffectfulAction::Rpc(a)) - } -} diff --git a/p2p/src/channels/rpc_effectful/p2p_channels_rpc_effectful_effects.rs b/p2p/src/channels/rpc_effectful/p2p_channels_rpc_effectful_effects.rs deleted file mode 100644 index 098f43093f..0000000000 --- a/p2p/src/channels/rpc_effectful/p2p_channels_rpc_effectful_effects.rs +++ /dev/null @@ -1,47 +0,0 @@ -use super::P2pChannelsRpcEffectfulAction; -use crate::channels::{ - rpc::{P2pChannelsRpcAction, RpcChannelMsg}, - ChannelId, MsgId, P2pChannelsService, -}; -use redux::ActionMeta; - -impl P2pChannelsRpcEffectfulAction { - pub fn effects(self, _meta: &ActionMeta, store: &mut Store) - where - Store: crate::P2pStore, - Store::Service: P2pChannelsService, - { - match self { - P2pChannelsRpcEffectfulAction::Init { peer_id } => { - store.service().channel_open(peer_id, ChannelId::Rpc); - // TODO(akoptelov): open a new stream, if we decide not to forcibly do that on connection established - store.dispatch(P2pChannelsRpcAction::Pending { peer_id }); - } - P2pChannelsRpcEffectfulAction::RequestSend { - peer_id, - id, - request, - on_init, - } => { - let msg = RpcChannelMsg::Request(id, *request.clone()); - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - - if let Some(on_init) = on_init { - store.dispatch_callback(on_init, (peer_id, id, *request)); - } - } - P2pChannelsRpcEffectfulAction::ResponseSend { - peer_id, - id, - response, - } => { - let msg = RpcChannelMsg::Response(id, response.map(|v| *v)); - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - } - } - } -} diff --git a/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_reducer.rs b/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_reducer.rs index b934647c95..bb9ed9d3ed 100644 --- a/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_reducer.rs +++ b/p2p/src/channels/signaling/discovery/p2p_channels_signaling_discovery_reducer.rs @@ -2,9 +2,9 @@ use openmina_core::{bug_condition, Substate}; use redux::ActionWithMeta; use crate::{ - channels::signaling::{ - discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction, - exchange::P2pChannelsSignalingExchangeAction, + channels::{ + signaling::exchange::P2pChannelsSignalingExchangeAction, ChannelId, MsgId, + P2pChannelsEffectfulAction, }, connection::{ outgoing::{P2pConnectionOutgoingAction, P2pConnectionOutgoingInitOpts}, @@ -44,7 +44,15 @@ impl P2pChannelsSignalingDiscoveryState { *state = Self::Init { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::Init { peer_id }); + dispatcher.push(P2pChannelsEffectfulAction::InitChannel { + peer_id, + id: ChannelId::SignalingDiscovery, + on_success: redux::callback!( + on_signaling_discovery_channel_init(peer_id: crate::PeerId) -> crate::P2pAction { + P2pChannelsSignalingExchangeAction::Pending { peer_id } + } + ), + }); Ok(()) } P2pChannelsSignalingDiscoveryAction::Pending { .. } => { @@ -73,10 +81,12 @@ impl P2pChannelsSignalingDiscoveryState { *local = SignalingDiscoveryState::Requested { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - let message = SignalingDiscoveryChannelMsg::GetNext; - dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { + + let msg = SignalingDiscoveryChannelMsg::GetNext.into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - message, + msg_id: MsgId::first(), + msg, }); Ok(()) } @@ -112,11 +122,12 @@ impl P2pChannelsSignalingDiscoveryState { }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { + + let msg = SignalingDiscoveryChannelMsg::Discovered { target_public_key }.into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - message: SignalingDiscoveryChannelMsg::Discovered { - target_public_key: target_public_key.clone(), - }, + msg_id: MsgId::first(), + msg, }); Ok(()) } @@ -195,10 +206,12 @@ impl P2pChannelsSignalingDiscoveryState { *local = SignalingDiscoveryState::Answered { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - let message = SignalingDiscoveryChannelMsg::Answer(answer.clone()); - dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { + + let msg = SignalingDiscoveryChannelMsg::Answer(answer.clone()).into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - message, + msg_id: MsgId::first(), + msg, }); Ok(()) } @@ -223,10 +236,12 @@ impl P2pChannelsSignalingDiscoveryState { *remote = SignalingDiscoveryState::DiscoveryRequested { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - let message = SignalingDiscoveryChannelMsg::Discover; - dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { + + let msg = SignalingDiscoveryChannelMsg::Discover.into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - message, + msg_id: MsgId::first(), + msg, }); Ok(()) } @@ -289,10 +304,12 @@ impl P2pChannelsSignalingDiscoveryState { target_public_key, }; let dispatcher = state_context.into_dispatcher(); - let message = SignalingDiscoveryChannelMsg::DiscoveredReject; - dispatcher.push(P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { + + let msg = SignalingDiscoveryChannelMsg::DiscoveredReject.into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - message, + msg_id: MsgId::first(), + msg, }); Ok(()) } @@ -327,10 +344,10 @@ impl P2pChannelsSignalingDiscoveryState { peer_id: target_public_key.peer_id(), }); dispatcher.push( - P2pChannelsSignalingDiscoveryEffectfulAction::OfferEncryptAndSend { + P2pChannelsEffectfulAction::SignalingDiscoveryOfferEncryptAndSend { peer_id, pub_key: target_public_key, - offer: offer.clone(), + offer, }, ); Ok(()) @@ -363,7 +380,7 @@ impl P2pChannelsSignalingDiscoveryState { error: P2pConnectionErrorResponse::InternalError, }), Some(answer) => dispatcher.push( - P2pChannelsSignalingDiscoveryEffectfulAction::AnswerDecrypt { + P2pChannelsEffectfulAction::SignalingDiscoveryAnswerDecrypt { peer_id, pub_key: target_public_key, answer: answer.clone(), diff --git a/p2p/src/channels/signaling/discovery_effectful/mod.rs b/p2p/src/channels/signaling/discovery_effectful/mod.rs deleted file mode 100644 index ae681677f3..0000000000 --- a/p2p/src/channels/signaling/discovery_effectful/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod p2p_channels_signaling_discovery_effectful_actions; -pub use p2p_channels_signaling_discovery_effectful_actions::P2pChannelsSignalingDiscoveryEffectfulAction; - -mod p2p_channels_signaling_discovery_effectful_effects; diff --git a/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_actions.rs b/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_actions.rs deleted file mode 100644 index 20e133e489..0000000000 --- a/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_actions.rs +++ /dev/null @@ -1,44 +0,0 @@ -use openmina_core::ActionEvent; -use serde::{Deserialize, Serialize}; - -use crate::{ - channels::{signaling::discovery::SignalingDiscoveryChannelMsg, P2pChannelsEffectfulAction}, - connection::Offer, - identity::PublicKey, - webrtc::EncryptedAnswer, - P2pState, PeerId, -}; - -#[derive(Debug, Clone, Serialize, Deserialize, ActionEvent)] -#[action_event(fields(display(peer_id)))] -pub enum P2pChannelsSignalingDiscoveryEffectfulAction { - Init { - peer_id: PeerId, - }, - MessageSend { - peer_id: PeerId, - message: SignalingDiscoveryChannelMsg, - }, - OfferEncryptAndSend { - peer_id: PeerId, - pub_key: PublicKey, - offer: Box, - }, - AnswerDecrypt { - peer_id: PeerId, - pub_key: PublicKey, - answer: EncryptedAnswer, - }, -} - -impl redux::EnablingCondition for P2pChannelsSignalingDiscoveryEffectfulAction { - fn is_enabled(&self, _state: &P2pState, _time: redux::Timestamp) -> bool { - true - } -} - -impl From for crate::P2pEffectfulAction { - fn from(action: P2pChannelsSignalingDiscoveryEffectfulAction) -> crate::P2pEffectfulAction { - crate::P2pEffectfulAction::Channels(P2pChannelsEffectfulAction::SignalingDiscovery(action)) - } -} diff --git a/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_effects.rs b/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_effects.rs deleted file mode 100644 index 9580ec90b4..0000000000 --- a/p2p/src/channels/signaling/discovery_effectful/p2p_channels_signaling_discovery_effectful_effects.rs +++ /dev/null @@ -1,75 +0,0 @@ -use redux::ActionMeta; - -use crate::{ - channels::{ - signaling::discovery::{P2pChannelsSignalingDiscoveryAction, SignalingDiscoveryChannelMsg}, - ChannelId, MsgId, - }, - connection::P2pConnectionResponse, - P2pChannelsService, -}; - -use super::P2pChannelsSignalingDiscoveryEffectfulAction; - -impl P2pChannelsSignalingDiscoveryEffectfulAction { - pub fn effects(self, _meta: &ActionMeta, store: &mut Store) - where - Store: crate::P2pStore, - Store::Service: P2pChannelsService, - { - match self { - P2pChannelsSignalingDiscoveryEffectfulAction::Init { peer_id } => { - store - .service() - .channel_open(peer_id, ChannelId::SignalingDiscovery); - store.dispatch(P2pChannelsSignalingDiscoveryAction::Pending { peer_id }); - } - P2pChannelsSignalingDiscoveryEffectfulAction::MessageSend { peer_id, message } => { - message_send(store.service(), peer_id, message); - } - P2pChannelsSignalingDiscoveryEffectfulAction::OfferEncryptAndSend { - peer_id, - pub_key, - offer, - } => match store.service().encrypt(&pub_key, &*offer) { - Err(_) => { - // todo!("Failed to encrypt webrtc offer. Handle it.") - } - Ok(offer) => { - let message = SignalingDiscoveryChannelMsg::DiscoveredAccept(offer); - message_send(store.service(), peer_id, message); - } - }, - P2pChannelsSignalingDiscoveryEffectfulAction::AnswerDecrypt { - peer_id, - pub_key, - answer, - } => { - match store - .service() - .decrypt::(&pub_key, &answer) - { - Err(_) => { - store.dispatch(P2pChannelsSignalingDiscoveryAction::AnswerDecrypted { - peer_id, - answer: P2pConnectionResponse::SignalDecryptionFailed, - }); - } - Ok(answer) => { - store.dispatch(P2pChannelsSignalingDiscoveryAction::AnswerDecrypted { - peer_id, - answer, - }); - } - } - } - } - } -} - -fn message_send(service: &mut S, peer_id: crate::PeerId, message: SignalingDiscoveryChannelMsg) -where - S: P2pChannelsService, -{ - service.channel_send(peer_id, MsgId::first(), message.into()) -} diff --git a/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_reducer.rs b/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_reducer.rs index 737d6e2e78..37620086c4 100644 --- a/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_reducer.rs +++ b/p2p/src/channels/signaling/exchange/p2p_channels_signaling_exchange_reducer.rs @@ -2,9 +2,9 @@ use openmina_core::{bug_condition, Substate}; use redux::ActionWithMeta; use crate::{ - channels::signaling::{ - discovery::P2pChannelsSignalingDiscoveryAction, - exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction, + channels::{ + signaling::discovery::P2pChannelsSignalingDiscoveryAction, ChannelId, MsgId, + P2pChannelsEffectfulAction, }, connection::{ incoming::{ @@ -45,7 +45,15 @@ impl P2pChannelsSignalingExchangeState { *state = Self::Init { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsSignalingExchangeEffectfulAction::Init { peer_id }); + dispatcher.push(P2pChannelsEffectfulAction::InitChannel { + peer_id, + id: ChannelId::SignalingExchange, + on_success: redux::callback!( + on_signaling_exchange_channel_init(peer_id: crate::PeerId) -> crate::P2pAction { + P2pChannelsSignalingExchangeAction::Pending { peer_id } + } + ), + }); Ok(()) } P2pChannelsSignalingExchangeAction::Pending { .. } => { @@ -74,10 +82,12 @@ impl P2pChannelsSignalingExchangeState { *local = SignalingExchangeState::Requested { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - let message = SignalingExchangeChannelMsg::GetNext; - dispatcher.push(P2pChannelsSignalingExchangeEffectfulAction::MessageSend { + + let msg = SignalingExchangeChannelMsg::GetNext.into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - message, + msg_id: MsgId::first(), + msg, }); Ok(()) } @@ -100,7 +110,7 @@ impl P2pChannelsSignalingExchangeState { let dispatcher = state_context.into_dispatcher(); let offer = offer.clone(); - dispatcher.push(P2pChannelsSignalingExchangeEffectfulAction::OfferDecrypt { + dispatcher.push(P2pChannelsEffectfulAction::SignalingExchangeOfferDecrypt { peer_id, pub_key: offerer_pub_key.clone(), offer, @@ -162,7 +172,7 @@ impl P2pChannelsSignalingExchangeState { let answer = answer.clone(); let dispatcher = state_context.into_dispatcher(); dispatcher.push( - P2pChannelsSignalingExchangeEffectfulAction::AnswerEncryptAndSend { + P2pChannelsEffectfulAction::SignalingExchangeAnswerEncryptAndSend { peer_id, pub_key: offerer_pub_key.clone(), answer: Some(answer), @@ -205,13 +215,16 @@ impl P2pChannelsSignalingExchangeState { offerer_pub_key: offerer_pub_key.clone(), }; let dispatcher = state_context.into_dispatcher(); - let message = SignalingExchangeChannelMsg::OfferToYou { - offerer_pub_key: offerer_pub_key.clone(), - offer: offer.clone(), - }; - dispatcher.push(P2pChannelsSignalingExchangeEffectfulAction::MessageSend { + let msg = SignalingExchangeChannelMsg::OfferToYou { + offerer_pub_key, + offer, + } + .into(); + + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - message, + msg_id: MsgId::first(), + msg, }); Ok(()) } diff --git a/p2p/src/channels/signaling/exchange_effectful/mod.rs b/p2p/src/channels/signaling/exchange_effectful/mod.rs deleted file mode 100644 index 89a36e2c5b..0000000000 --- a/p2p/src/channels/signaling/exchange_effectful/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod p2p_channels_signaling_exchange_effectful_actions; -pub use p2p_channels_signaling_exchange_effectful_actions::P2pChannelsSignalingExchangeEffectfulAction; - -mod p2p_channels_signaling_exchange_effectful_effects; diff --git a/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_actions.rs b/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_actions.rs deleted file mode 100644 index 6daa07af78..0000000000 --- a/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_actions.rs +++ /dev/null @@ -1,44 +0,0 @@ -use openmina_core::ActionEvent; -use serde::{Deserialize, Serialize}; - -use crate::{ - channels::{signaling::exchange::SignalingExchangeChannelMsg, P2pChannelsEffectfulAction}, - connection::P2pConnectionResponse, - identity::PublicKey, - webrtc::EncryptedOffer, - P2pState, PeerId, -}; - -#[derive(Debug, Clone, Serialize, Deserialize, ActionEvent)] -#[action_event(fields(display(peer_id)))] -pub enum P2pChannelsSignalingExchangeEffectfulAction { - Init { - peer_id: PeerId, - }, - MessageSend { - peer_id: PeerId, - message: SignalingExchangeChannelMsg, - }, - OfferDecrypt { - peer_id: PeerId, - pub_key: PublicKey, - offer: EncryptedOffer, - }, - AnswerEncryptAndSend { - peer_id: PeerId, - pub_key: PublicKey, - answer: Option, - }, -} - -impl redux::EnablingCondition for P2pChannelsSignalingExchangeEffectfulAction { - fn is_enabled(&self, _state: &P2pState, _time: redux::Timestamp) -> bool { - true - } -} - -impl From for crate::P2pEffectfulAction { - fn from(action: P2pChannelsSignalingExchangeEffectfulAction) -> crate::P2pEffectfulAction { - crate::P2pEffectfulAction::Channels(P2pChannelsEffectfulAction::SignalingExchange(action)) - } -} diff --git a/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_effects.rs b/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_effects.rs deleted file mode 100644 index 58116b1fcc..0000000000 --- a/p2p/src/channels/signaling/exchange_effectful/p2p_channels_signaling_exchange_effectful_effects.rs +++ /dev/null @@ -1,96 +0,0 @@ -use openmina_core::bug_condition; -use redux::ActionMeta; - -use crate::{ - channels::{ - signaling::exchange::{P2pChannelsSignalingExchangeAction, SignalingExchangeChannelMsg}, - ChannelId, MsgId, - }, - webrtc::{EncryptedAnswer, Offer}, - P2pChannelsService, -}; - -use super::P2pChannelsSignalingExchangeEffectfulAction; - -impl P2pChannelsSignalingExchangeEffectfulAction { - pub fn effects(self, _meta: &ActionMeta, store: &mut Store) - where - Store: crate::P2pStore, - Store::Service: P2pChannelsService, - { - match self { - P2pChannelsSignalingExchangeEffectfulAction::Init { peer_id } => { - store - .service() - .channel_open(peer_id, ChannelId::SignalingExchange); - store.dispatch(P2pChannelsSignalingExchangeAction::Pending { peer_id }); - } - P2pChannelsSignalingExchangeEffectfulAction::MessageSend { peer_id, message } => { - message_send(store.service(), peer_id, message); - } - P2pChannelsSignalingExchangeEffectfulAction::OfferDecrypt { - peer_id, - pub_key, - offer, - } => { - match store.service().decrypt::(&pub_key, &offer) { - Err(_) => { - store.dispatch(P2pChannelsSignalingExchangeAction::OfferDecryptError { - peer_id, - }); - } - Ok(offer) if offer.identity_pub_key != pub_key => { - // TODO(binier): propagate specific error. - // This is invalid behavior either from relayer or offerer. - store.dispatch(P2pChannelsSignalingExchangeAction::OfferDecryptError { - peer_id, - }); - } - Ok(offer) => { - store.dispatch(P2pChannelsSignalingExchangeAction::OfferDecryptSuccess { - peer_id, - offer, - }); - } - } - } - P2pChannelsSignalingExchangeEffectfulAction::AnswerEncryptAndSend { - peer_id, - pub_key, - answer, - } => { - let answer = match answer { - None => { - answer_message_send(store.service(), peer_id, None); - return; - } - Some(v) => v, - }; - match store.service().encrypt(&pub_key, &answer) { - Err(_) => bug_condition!("Failed to encrypt webrtc answer. Shouldn't happen since we managed to decrypt sent offer."), - Ok(answer) => { - answer_message_send(store.service(), peer_id, Some(answer)); - } - } - } - } - } -} - -fn answer_message_send(service: &mut S, peer_id: crate::PeerId, answer: Option) -where - S: P2pChannelsService, -{ - message_send( - service, - peer_id, - SignalingExchangeChannelMsg::Answer(answer), - ) -} - -fn message_send(service: &mut S, peer_id: crate::PeerId, message: SignalingExchangeChannelMsg) -where - S: P2pChannelsService, -{ - service.channel_send(peer_id, MsgId::first(), message.into()) -} diff --git a/p2p/src/channels/signaling/mod.rs b/p2p/src/channels/signaling/mod.rs index 3dd52d4f66..84fa98a268 100644 --- a/p2p/src/channels/signaling/mod.rs +++ b/p2p/src/channels/signaling/mod.rs @@ -16,9 +16,7 @@ //! 7. [discovery] Relayer relays the answer to the dialer. pub mod discovery; -pub mod discovery_effectful; pub mod exchange; -pub mod exchange_effectful; mod p2p_channels_signaling_state; pub use p2p_channels_signaling_state::*; diff --git a/p2p/src/channels/snark/p2p_channels_snark_reducer.rs b/p2p/src/channels/snark/p2p_channels_snark_reducer.rs index 8fa06efa63..c17ea5d921 100644 --- a/p2p/src/channels/snark/p2p_channels_snark_reducer.rs +++ b/p2p/src/channels/snark/p2p_channels_snark_reducer.rs @@ -2,10 +2,14 @@ use openmina_core::{bug_condition, Substate}; use redux::ActionWithMeta; use crate::{ - channels::snark_effectful::P2pChannelsSnarkEffectfulAction, P2pNetworkPubsubAction, P2pState, + channels::{ChannelId, MsgId, P2pChannelsEffectfulAction}, + P2pNetworkPubsubAction, P2pState, }; -use super::{P2pChannelsSnarkAction, P2pChannelsSnarkState, SnarkPropagationState}; +use super::{ + P2pChannelsSnarkAction, P2pChannelsSnarkState, SnarkPropagationChannelMsg, + SnarkPropagationState, +}; use mina_p2p_messages::{gossip::GossipNetMessageV2, v2}; impl P2pChannelsSnarkState { @@ -32,7 +36,16 @@ impl P2pChannelsSnarkState { *state = Self::Init { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsSnarkEffectfulAction::Init { peer_id }); + + dispatcher.push(P2pChannelsEffectfulAction::InitChannel { + peer_id, + id: ChannelId::SnarkPropagation, + on_success: redux::callback!( + on_snark_channel_init(peer_id: crate::PeerId) -> crate::P2pAction { + P2pChannelsSnarkAction::Pending { peer_id } + } + ), + }); Ok(()) } P2pChannelsSnarkAction::Pending { .. } => { @@ -65,7 +78,11 @@ impl P2pChannelsSnarkState { }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsSnarkEffectfulAction::RequestSend { peer_id, limit }); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { + peer_id, + msg_id: MsgId::first(), + msg: SnarkPropagationChannelMsg::GetNext { limit }.into(), + }); Ok(()) } P2pChannelsSnarkAction::PromiseReceived { promised_count, .. } => { @@ -178,7 +195,19 @@ impl P2pChannelsSnarkState { }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsSnarkEffectfulAction::ResponseSend { peer_id, snarks }); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { + peer_id, + msg_id: MsgId::first(), + msg: SnarkPropagationChannelMsg::WillSend { count }.into(), + }); + + for snark in snarks { + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { + peer_id, + msg_id: MsgId::first(), + msg: SnarkPropagationChannelMsg::Snark(snark).into(), + }); + } Ok(()) } #[cfg(feature = "p2p-libp2p")] diff --git a/p2p/src/channels/snark_effectful/mod.rs b/p2p/src/channels/snark_effectful/mod.rs deleted file mode 100644 index 8f1d95bc4b..0000000000 --- a/p2p/src/channels/snark_effectful/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod p2p_channels_snark_effectful_actions; -pub use p2p_channels_snark_effectful_actions::P2pChannelsSnarkEffectfulAction; -mod p2p_channels_snark_effectful_effects; diff --git a/p2p/src/channels/snark_effectful/p2p_channels_snark_effectful_actions.rs b/p2p/src/channels/snark_effectful/p2p_channels_snark_effectful_actions.rs deleted file mode 100644 index a3214d7e26..0000000000 --- a/p2p/src/channels/snark_effectful/p2p_channels_snark_effectful_actions.rs +++ /dev/null @@ -1,31 +0,0 @@ -use crate::{channels::P2pChannelsEffectfulAction, P2pState, PeerId}; -use openmina_core::{snark::SnarkInfo, ActionEvent}; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)] -#[action_event(fields(display(peer_id)))] -pub enum P2pChannelsSnarkEffectfulAction { - Init { - peer_id: PeerId, - }, - RequestSend { - peer_id: PeerId, - limit: u8, - }, - ResponseSend { - peer_id: PeerId, - snarks: Vec, - }, -} - -impl redux::EnablingCondition for P2pChannelsSnarkEffectfulAction { - fn is_enabled(&self, _state: &P2pState, _time: redux::Timestamp) -> bool { - true - } -} - -impl From for crate::P2pEffectfulAction { - fn from(action: P2pChannelsSnarkEffectfulAction) -> crate::P2pEffectfulAction { - crate::P2pEffectfulAction::Channels(P2pChannelsEffectfulAction::Snark(action)) - } -} diff --git a/p2p/src/channels/snark_effectful/p2p_channels_snark_effectful_effects.rs b/p2p/src/channels/snark_effectful/p2p_channels_snark_effectful_effects.rs deleted file mode 100644 index 4ef59ccbb6..0000000000 --- a/p2p/src/channels/snark_effectful/p2p_channels_snark_effectful_effects.rs +++ /dev/null @@ -1,46 +0,0 @@ -use super::P2pChannelsSnarkEffectfulAction; -use crate::channels::{ - snark::{P2pChannelsSnarkAction, SnarkPropagationChannelMsg}, - ChannelId, MsgId, P2pChannelsService, -}; -use redux::ActionMeta; - -impl P2pChannelsSnarkEffectfulAction { - pub fn effects(self, _: &ActionMeta, store: &mut Store) - where - Store: crate::P2pStore, - Store::Service: P2pChannelsService, - { - match self { - P2pChannelsSnarkEffectfulAction::Init { peer_id } => { - store - .service() - .channel_open(peer_id, ChannelId::SnarkPropagation); - store.dispatch(P2pChannelsSnarkAction::Pending { peer_id }); - } - P2pChannelsSnarkEffectfulAction::RequestSend { peer_id, limit } => { - let msg = SnarkPropagationChannelMsg::GetNext { limit }; - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - } - P2pChannelsSnarkEffectfulAction::ResponseSend { - peer_id, snarks, .. - } => { - let msg = SnarkPropagationChannelMsg::WillSend { - count: snarks.len() as u8, - }; - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - - for snark in snarks { - let msg = SnarkPropagationChannelMsg::Snark(snark); - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - } - } - } - } -} diff --git a/p2p/src/channels/snark_job_commitment/p2p_channels_snark_job_commitment_reducer.rs b/p2p/src/channels/snark_job_commitment/p2p_channels_snark_job_commitment_reducer.rs index 14586ab34e..236a67ae93 100644 --- a/p2p/src/channels/snark_job_commitment/p2p_channels_snark_job_commitment_reducer.rs +++ b/p2p/src/channels/snark_job_commitment/p2p_channels_snark_job_commitment_reducer.rs @@ -2,13 +2,13 @@ use openmina_core::{bug_condition, Substate}; use redux::ActionWithMeta; use crate::{ - channels::snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction, + channels::{ChannelId, MsgId, P2pChannelsEffectfulAction}, P2pState, }; use super::{ P2pChannelsSnarkJobCommitmentAction, P2pChannelsSnarkJobCommitmentState, - SnarkJobCommitmentPropagationState, + SnarkJobCommitmentPropagationChannelMsg, SnarkJobCommitmentPropagationState, }; const LIMIT: u8 = 16; @@ -37,7 +37,15 @@ impl P2pChannelsSnarkJobCommitmentState { *snark_job_state = Self::Init { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsSnarkJobCommitmentAction::Pending { peer_id }); + dispatcher.push(P2pChannelsEffectfulAction::InitChannel { + peer_id, + id: ChannelId::SnarkJobCommitmentPropagation, + on_success: redux::callback!( + on_snark_job_commitment_channel_init(peer_id: crate::PeerId) -> crate::P2pAction { + P2pChannelsSnarkJobCommitmentAction::Pending { peer_id } + } + ), + }); Ok(()) } P2pChannelsSnarkJobCommitmentAction::Pending { .. } => { @@ -197,10 +205,23 @@ impl P2pChannelsSnarkJobCommitmentState { }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsSnarkJobCommitmentEffectfulAction::ResponseSend { + let msg = SnarkJobCommitmentPropagationChannelMsg::WillSend { count }.into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - commitments, + msg_id: MsgId::first(), + msg, }); + + for commitment in commitments { + let msg = + SnarkJobCommitmentPropagationChannelMsg::Commitment(commitment).into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { + peer_id, + msg_id: MsgId::first(), + msg, + }); + } + Ok(()) } } diff --git a/p2p/src/channels/snark_job_commitment_effectful/mod.rs b/p2p/src/channels/snark_job_commitment_effectful/mod.rs deleted file mode 100644 index 91a01082ef..0000000000 --- a/p2p/src/channels/snark_job_commitment_effectful/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod p2p_channels_snark_job_commitment_effectful_actions; -pub use p2p_channels_snark_job_commitment_effectful_actions::P2pChannelsSnarkJobCommitmentEffectfulAction; -mod p2p_channels_snark_job_commitment_effectful_effects; diff --git a/p2p/src/channels/snark_job_commitment_effectful/p2p_channels_snark_job_commitment_effectful_actions.rs b/p2p/src/channels/snark_job_commitment_effectful/p2p_channels_snark_job_commitment_effectful_actions.rs deleted file mode 100644 index d3836687f8..0000000000 --- a/p2p/src/channels/snark_job_commitment_effectful/p2p_channels_snark_job_commitment_effectful_actions.rs +++ /dev/null @@ -1,32 +0,0 @@ -use openmina_core::{snark::SnarkJobCommitment, ActionEvent}; -use serde::{Deserialize, Serialize}; - -use crate::{channels::P2pChannelsEffectfulAction, P2pState, PeerId}; - -#[derive(Debug, Clone, Serialize, Deserialize, ActionEvent)] -#[action_event(fields(display(peer_id)))] -pub enum P2pChannelsSnarkJobCommitmentEffectfulAction { - Init { - peer_id: PeerId, - }, - RequestSend { - peer_id: PeerId, - limit: u8, - }, - ResponseSend { - peer_id: PeerId, - commitments: Vec, - }, -} - -impl redux::EnablingCondition for P2pChannelsSnarkJobCommitmentEffectfulAction { - fn is_enabled(&self, _state: &P2pState, _time: redux::Timestamp) -> bool { - true - } -} - -impl From for crate::P2pEffectfulAction { - fn from(action: P2pChannelsSnarkJobCommitmentEffectfulAction) -> crate::P2pEffectfulAction { - crate::P2pEffectfulAction::Channels(P2pChannelsEffectfulAction::SnarkJobCommitment(action)) - } -} diff --git a/p2p/src/channels/snark_job_commitment_effectful/p2p_channels_snark_job_commitment_effectful_effects.rs b/p2p/src/channels/snark_job_commitment_effectful/p2p_channels_snark_job_commitment_effectful_effects.rs deleted file mode 100644 index 94741f1b4c..0000000000 --- a/p2p/src/channels/snark_job_commitment_effectful/p2p_channels_snark_job_commitment_effectful_effects.rs +++ /dev/null @@ -1,55 +0,0 @@ -use redux::ActionMeta; - -use crate::channels::{ - snark_job_commitment::{ - P2pChannelsSnarkJobCommitmentAction, SnarkJobCommitmentPropagationChannelMsg, - }, - ChannelId, MsgId, P2pChannelsService, -}; - -use super::p2p_channels_snark_job_commitment_effectful_actions::P2pChannelsSnarkJobCommitmentEffectfulAction; - -impl P2pChannelsSnarkJobCommitmentEffectfulAction { - pub fn effects(self, _: &ActionMeta, store: &mut Store) - where - Store: crate::P2pStore, - Store::Service: P2pChannelsService, - { - match self { - P2pChannelsSnarkJobCommitmentEffectfulAction::Init { peer_id } => { - store - .service() - .channel_open(peer_id, ChannelId::SnarkJobCommitmentPropagation); - store.dispatch(P2pChannelsSnarkJobCommitmentAction::Pending { peer_id }); - } - P2pChannelsSnarkJobCommitmentEffectfulAction::RequestSend { peer_id, limit } => { - let msg = SnarkJobCommitmentPropagationChannelMsg::GetNext { limit }; - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - } - P2pChannelsSnarkJobCommitmentEffectfulAction::ResponseSend { - peer_id, - commitments, - } => { - if commitments.is_empty() { - return; - } - - let msg = SnarkJobCommitmentPropagationChannelMsg::WillSend { - count: commitments.len() as u8, - }; - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - - for commitment in commitments { - let msg = SnarkJobCommitmentPropagationChannelMsg::Commitment(commitment); - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - } - } - } - } -} diff --git a/p2p/src/channels/streaming_rpc/p2p_channels_streaming_rpc_reducer.rs b/p2p/src/channels/streaming_rpc/p2p_channels_streaming_rpc_reducer.rs index 3340b327ec..463b5436f6 100644 --- a/p2p/src/channels/streaming_rpc/p2p_channels_streaming_rpc_reducer.rs +++ b/p2p/src/channels/streaming_rpc/p2p_channels_streaming_rpc_reducer.rs @@ -1,13 +1,16 @@ use openmina_core::{bug_condition, Substate}; use redux::ActionWithMeta; -use crate::{channels::streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction, P2pState}; +use crate::{ + channels::{ChannelId, ChannelMsg, MsgId, P2pChannelsEffectfulAction}, + P2pState, +}; use super::{ staged_ledger_parts::{StagedLedgerPartsReceiveProgress, StagedLedgerPartsSendProgress}, P2pChannelsStreamingRpcAction, P2pChannelsStreamingRpcState, P2pStreamingRpcLocalState, P2pStreamingRpcRemoteState, P2pStreamingRpcRequest, P2pStreamingRpcResponseFull, - P2pStreamingRpcSendProgress, + P2pStreamingRpcSendProgress, StreamingRpcChannelMsg, }; impl P2pChannelsStreamingRpcState { @@ -36,7 +39,15 @@ impl P2pChannelsStreamingRpcState { *streaming_rpc_state = Self::Init { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsStreamingRpcEffectfulAction::Init { peer_id }); + dispatcher.push(P2pChannelsEffectfulAction::InitChannel { + peer_id, + id: ChannelId::StreamingRpc, + on_success: redux::callback!( + on_streaming_rpc_channel_init(peer_id: crate::PeerId) -> crate::P2pAction { + P2pChannelsStreamingRpcAction::Pending { peer_id } + } + ), + }); Ok(()) } P2pChannelsStreamingRpcAction::Pending { .. } => { @@ -88,12 +99,14 @@ impl P2pChannelsStreamingRpcState { }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsStreamingRpcEffectfulAction::RequestSend { + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - id, - request, - on_init, + msg_id: MsgId::first(), + msg: StreamingRpcChannelMsg::Request(id, *request.clone()).into(), }); + if let Some(callback) = on_init { + dispatcher.push_callback(callback, (peer_id, id, *request)); + } Ok(()) } P2pChannelsStreamingRpcAction::Timeout { id, .. } => { @@ -125,9 +138,11 @@ impl P2pChannelsStreamingRpcState { } let dispatcher = state_context.into_dispatcher(); - dispatcher.push( - P2pChannelsStreamingRpcEffectfulAction::ResponseNextPartGet { peer_id, id }, - ); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { + peer_id, + msg_id: MsgId::first(), + msg: ChannelMsg::StreamingRpc(StreamingRpcChannelMsg::Next(id)), + }); Ok(()) } P2pChannelsStreamingRpcAction::ResponsePartReceived { response, id, .. } => { @@ -269,11 +284,19 @@ impl P2pChannelsStreamingRpcState { } let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsStreamingRpcEffectfulAction::ResponseSendInit { - peer_id, - id, - response, - }); + if response.is_none() { + let msg = StreamingRpcChannelMsg::Response(id, None).into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { + peer_id, + msg_id: MsgId::first(), + msg, + }); + dispatcher.push(P2pChannelsStreamingRpcAction::ResponseSent { peer_id, id }); + return Ok(()); + } + + dispatcher + .push(P2pChannelsStreamingRpcAction::ResponsePartNextSend { peer_id, id }); Ok(()) } P2pChannelsStreamingRpcAction::ResponsePartNextSend { id, .. } => { @@ -368,11 +391,14 @@ impl P2pChannelsStreamingRpcState { } let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsStreamingRpcEffectfulAction::ResponsePartSend { + + let msg = StreamingRpcChannelMsg::Response(id, Some(*response)).into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - id, - response, + msg_id: MsgId::first(), + msg, }); + dispatcher.push(P2pChannelsStreamingRpcAction::ResponseSent { peer_id, id }); Ok(()) } P2pChannelsStreamingRpcAction::ResponseSent { id, .. } => { diff --git a/p2p/src/channels/streaming_rpc_effectful/mod.rs b/p2p/src/channels/streaming_rpc_effectful/mod.rs deleted file mode 100644 index 864b055264..0000000000 --- a/p2p/src/channels/streaming_rpc_effectful/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod p2p_channels_streaming_rpc_effectful_actions; -pub use p2p_channels_streaming_rpc_effectful_actions::P2pChannelsStreamingRpcEffectfulAction; -mod p2p_channels_streaming_rpc_effectful_effects; diff --git a/p2p/src/channels/streaming_rpc_effectful/p2p_channels_streaming_rpc_effectful_actions.rs b/p2p/src/channels/streaming_rpc_effectful/p2p_channels_streaming_rpc_effectful_actions.rs deleted file mode 100644 index cc5fac9b05..0000000000 --- a/p2p/src/channels/streaming_rpc_effectful/p2p_channels_streaming_rpc_effectful_actions.rs +++ /dev/null @@ -1,53 +0,0 @@ -use crate::{ - channels::{ - streaming_rpc::{ - P2pStreamingRpcId, P2pStreamingRpcRequest, P2pStreamingRpcResponse, - P2pStreamingRpcResponseFull, - }, - P2pChannelsEffectfulAction, - }, - P2pState, PeerId, -}; -use openmina_core::ActionEvent; -use redux::Timestamp; -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)] -#[action_event(fields(display(peer_id)))] -pub enum P2pChannelsStreamingRpcEffectfulAction { - Init { - peer_id: PeerId, - }, - RequestSend { - peer_id: PeerId, - id: P2pStreamingRpcId, - request: Box, - on_init: Option>, - }, - ResponseNextPartGet { - peer_id: PeerId, - id: P2pStreamingRpcId, - }, - ResponseSendInit { - peer_id: PeerId, - id: P2pStreamingRpcId, - response: Option, - }, - ResponsePartSend { - peer_id: PeerId, - id: P2pStreamingRpcId, - response: Box, - }, -} - -impl redux::EnablingCondition for P2pChannelsStreamingRpcEffectfulAction { - fn is_enabled(&self, _state: &P2pState, _time: Timestamp) -> bool { - true - } -} - -impl From for crate::P2pEffectfulAction { - fn from(a: P2pChannelsStreamingRpcEffectfulAction) -> crate::P2pEffectfulAction { - crate::P2pEffectfulAction::Channels(P2pChannelsEffectfulAction::StreamingRpc(a)) - } -} diff --git a/p2p/src/channels/streaming_rpc_effectful/p2p_channels_streaming_rpc_effectful_effects.rs b/p2p/src/channels/streaming_rpc_effectful/p2p_channels_streaming_rpc_effectful_effects.rs deleted file mode 100644 index 244cd3db6a..0000000000 --- a/p2p/src/channels/streaming_rpc_effectful/p2p_channels_streaming_rpc_effectful_effects.rs +++ /dev/null @@ -1,72 +0,0 @@ -use redux::ActionMeta; - -use crate::channels::{ - streaming_rpc::{P2pChannelsStreamingRpcAction, StreamingRpcChannelMsg}, - ChannelId, MsgId, P2pChannelsService, -}; - -use super::P2pChannelsStreamingRpcEffectfulAction; - -impl P2pChannelsStreamingRpcEffectfulAction { - pub fn effects(self, _: &ActionMeta, store: &mut Store) - where - Store: crate::P2pStore, - Store::Service: P2pChannelsService, - { - match self { - P2pChannelsStreamingRpcEffectfulAction::Init { peer_id } => { - store - .service() - .channel_open(peer_id, ChannelId::StreamingRpc); - store.dispatch(P2pChannelsStreamingRpcAction::Pending { peer_id }); - } - P2pChannelsStreamingRpcEffectfulAction::RequestSend { - peer_id, - id, - request, - on_init, - } => { - let msg = StreamingRpcChannelMsg::Request(id, *request.clone()); - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - if let Some(on_init) = on_init { - store.dispatch_callback(on_init, (peer_id, id, *request)); - } - } - P2pChannelsStreamingRpcEffectfulAction::ResponseNextPartGet { peer_id, id } => { - let msg = StreamingRpcChannelMsg::Next(id); - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - } - P2pChannelsStreamingRpcEffectfulAction::ResponseSendInit { - peer_id, - id, - response, - } => { - if response.is_none() { - let msg = StreamingRpcChannelMsg::Response(id, None); - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - - store.dispatch(P2pChannelsStreamingRpcAction::ResponseSent { peer_id, id }); - return; - } - store.dispatch(P2pChannelsStreamingRpcAction::ResponsePartNextSend { peer_id, id }); - } - P2pChannelsStreamingRpcEffectfulAction::ResponsePartSend { - peer_id, - id, - response, - } => { - let msg = StreamingRpcChannelMsg::Response(id, Some(*response)); - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - store.dispatch(P2pChannelsStreamingRpcAction::ResponseSent { peer_id, id }); - } - } - } -} diff --git a/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs b/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs index c7f7a8c60a..bf845a2d97 100644 --- a/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs +++ b/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs @@ -1,9 +1,10 @@ use super::{ - P2pChannelsTransactionAction, P2pChannelsTransactionState, TransactionPropagationState, + P2pChannelsTransactionAction, P2pChannelsTransactionState, TransactionPropagationChannelMsg, + TransactionPropagationState, }; use crate::{ - channels::transaction_effectful::P2pChannelsTransactionEffectfulAction, P2pNetworkPubsubAction, - P2pState, + channels::{ChannelId, MsgId, P2pChannelsEffectfulAction}, + P2pNetworkPubsubAction, P2pState, }; use mina_p2p_messages::{gossip::GossipNetMessageV2, v2}; use openmina_core::{bug_condition, Substate}; @@ -33,7 +34,15 @@ impl P2pChannelsTransactionState { *state = Self::Init { time: meta.time() }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsTransactionEffectfulAction::Init { peer_id }); + dispatcher.push(P2pChannelsEffectfulAction::InitChannel { + peer_id, + id: ChannelId::TransactionPropagation, + on_success: redux::callback!( + on_transaction_channel_init(peer_id: crate::PeerId) -> crate::P2pAction { + P2pChannelsTransactionAction::Pending { peer_id } + } + ), + }); Ok(()) } P2pChannelsTransactionAction::Pending { .. } => { @@ -66,8 +75,11 @@ impl P2pChannelsTransactionState { }; let dispatcher = state_context.into_dispatcher(); - dispatcher - .push(P2pChannelsTransactionEffectfulAction::RequestSend { peer_id, limit }); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { + peer_id, + msg_id: MsgId::first(), + msg: TransactionPropagationChannelMsg::GetNext { limit }.into(), + }); Ok(()) } P2pChannelsTransactionAction::PromiseReceived { promised_count, .. } => { @@ -172,10 +184,21 @@ impl P2pChannelsTransactionState { }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pChannelsTransactionEffectfulAction::ResponseSend { + let msg = TransactionPropagationChannelMsg::WillSend { count }.into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { peer_id, - transactions, + msg_id: MsgId::first(), + msg, }); + + for tx in transactions { + let msg = TransactionPropagationChannelMsg::Transaction(tx).into(); + dispatcher.push(P2pChannelsEffectfulAction::MessageSend { + peer_id, + msg_id: MsgId::first(), + msg, + }); + } Ok(()) } P2pChannelsTransactionAction::Libp2pReceived { transaction, .. } => { diff --git a/p2p/src/channels/transaction_effectful/mod.rs b/p2p/src/channels/transaction_effectful/mod.rs deleted file mode 100644 index 470d454098..0000000000 --- a/p2p/src/channels/transaction_effectful/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod p2p_channels_transaction_effectful_actions; -pub use p2p_channels_transaction_effectful_actions::P2pChannelsTransactionEffectfulAction; -mod p2p_channels_transaction_effectful_effects; diff --git a/p2p/src/channels/transaction_effectful/p2p_channels_transaction_effectful_actions.rs b/p2p/src/channels/transaction_effectful/p2p_channels_transaction_effectful_actions.rs deleted file mode 100644 index 82728a0e0e..0000000000 --- a/p2p/src/channels/transaction_effectful/p2p_channels_transaction_effectful_actions.rs +++ /dev/null @@ -1,33 +0,0 @@ -use openmina_core::transaction::TransactionInfo; -use openmina_core::ActionEvent; -use serde::{Deserialize, Serialize}; - -use crate::{channels::P2pChannelsEffectfulAction, P2pState, PeerId}; - -#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)] -#[action_event(fields(display(peer_id)))] -pub enum P2pChannelsTransactionEffectfulAction { - Init { - peer_id: PeerId, - }, - RequestSend { - peer_id: PeerId, - limit: u8, - }, - ResponseSend { - peer_id: PeerId, - transactions: Vec, - }, -} - -impl redux::EnablingCondition for P2pChannelsTransactionEffectfulAction { - fn is_enabled(&self, _state: &P2pState, _time: redux::Timestamp) -> bool { - true - } -} - -impl From for crate::P2pEffectfulAction { - fn from(action: P2pChannelsTransactionEffectfulAction) -> crate::P2pEffectfulAction { - crate::P2pEffectfulAction::Channels(P2pChannelsEffectfulAction::Transaction(action)) - } -} diff --git a/p2p/src/channels/transaction_effectful/p2p_channels_transaction_effectful_effects.rs b/p2p/src/channels/transaction_effectful/p2p_channels_transaction_effectful_effects.rs deleted file mode 100644 index 94d90e621c..0000000000 --- a/p2p/src/channels/transaction_effectful/p2p_channels_transaction_effectful_effects.rs +++ /dev/null @@ -1,51 +0,0 @@ -use super::P2pChannelsTransactionEffectfulAction; -use crate::channels::{ - transaction::{P2pChannelsTransactionAction, TransactionPropagationChannelMsg}, - ChannelId, MsgId, P2pChannelsService, -}; -use redux::ActionMeta; - -impl P2pChannelsTransactionEffectfulAction { - pub fn effects(self, _: &ActionMeta, store: &mut Store) - where - Store: crate::P2pStore, - Store::Service: P2pChannelsService, - { - match self { - P2pChannelsTransactionEffectfulAction::Init { peer_id } => { - store - .service() - .channel_open(peer_id, ChannelId::TransactionPropagation); - store.dispatch(P2pChannelsTransactionAction::Pending { peer_id }); - } - P2pChannelsTransactionEffectfulAction::RequestSend { peer_id, limit } => { - let msg = TransactionPropagationChannelMsg::GetNext { limit }; - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - } - P2pChannelsTransactionEffectfulAction::ResponseSend { - peer_id, - transactions, - } => { - if transactions.is_empty() { - return; - } - - let msg = TransactionPropagationChannelMsg::WillSend { - count: transactions.len() as u8, - }; - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - - for tx in transactions { - let msg = TransactionPropagationChannelMsg::Transaction(tx); - store - .service() - .channel_send(peer_id, MsgId::first(), msg.into()); - } - } - } - } -} diff --git a/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs b/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs index de0acbd268..744880f747 100644 --- a/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs +++ b/p2p/src/connection/outgoing/p2p_connection_outgoing_reducer.rs @@ -207,13 +207,11 @@ impl P2pConnectionOutgoingState { if let Some(relay_peer_id) = opts.webrtc_p2p_relay_peer_id() { dispatcher.push(P2pChannelsSignalingDiscoveryAction::DiscoveredAccept { peer_id: relay_peer_id, - offer: offer.clone(), + offer, }); } else { - dispatcher.push(P2pConnectionOutgoingEffectfulAction::OfferSend { - peer_id, - offer: offer.clone(), - }); + dispatcher + .push(P2pConnectionOutgoingEffectfulAction::OfferSend { peer_id, offer }); } Ok(()) } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index 92000312c4..ed642d2939 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -7,23 +7,16 @@ pub mod identity; use bootstrap::P2pNetworkKadBootstrapState; use channels::{ best_tip::P2pChannelsBestTipAction, - best_tip_effectful::P2pChannelsBestTipEffectfulAction, rpc::P2pChannelsRpcAction, - rpc_effectful::P2pChannelsRpcEffectfulAction, signaling::{ discovery::P2pChannelsSignalingDiscoveryAction, - discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction, exchange::P2pChannelsSignalingExchangeAction, - exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction, }, snark::P2pChannelsSnarkAction, - snark_effectful::P2pChannelsSnarkEffectfulAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction, - snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction, streaming_rpc::P2pChannelsStreamingRpcAction, - streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction, transaction::P2pChannelsTransactionAction, - transaction_effectful::P2pChannelsTransactionEffectfulAction, + P2pChannelsEffectfulAction, }; use connection::{ incoming::P2pConnectionIncomingAction, @@ -136,9 +129,7 @@ pub trait P2pActionTrait: + From + From + From - + From + From - + From + From + From + From @@ -151,12 +142,9 @@ pub trait P2pActionTrait: + From + From + From - + From - + From - + From - + From - + From - + From + From + + From { } + +pub trait Action: From {} diff --git a/p2p/src/p2p_effects.rs b/p2p/src/p2p_effects.rs index 5a360e5722..f29d9f5625 100644 --- a/p2p/src/p2p_effects.rs +++ b/p2p/src/p2p_effects.rs @@ -1,7 +1,4 @@ -use crate::{ - channels::P2pChannelsEffectfulAction, connection::P2pConnectionEffectfulAction, - P2pEffectfulAction, P2pStore, -}; +use crate::{connection::P2pConnectionEffectfulAction, P2pEffectfulAction, P2pStore}; use redux::ActionMeta; impl P2pEffectfulAction { @@ -12,22 +9,7 @@ impl P2pEffectfulAction { { match self { P2pEffectfulAction::Initialize => {} - P2pEffectfulAction::Channels(action) => match action { - P2pChannelsEffectfulAction::SignalingDiscovery(action) => { - action.effects(&meta, store) - } - P2pChannelsEffectfulAction::SignalingExchange(action) => { - action.effects(&meta, store) - } - P2pChannelsEffectfulAction::BestTip(action) => action.effects(&meta, store), - P2pChannelsEffectfulAction::Transaction(action) => action.effects(&meta, store), - P2pChannelsEffectfulAction::StreamingRpc(action) => action.effects(&meta, store), - P2pChannelsEffectfulAction::SnarkJobCommitment(action) => { - action.effects(&meta, store) - } - P2pChannelsEffectfulAction::Rpc(action) => action.effects(&meta, store), - P2pChannelsEffectfulAction::Snark(action) => action.effects(&meta, store), - }, + P2pEffectfulAction::Channels(action) => action.effects(&meta, store), P2pEffectfulAction::Connection(action) => match action { P2pConnectionEffectfulAction::Outgoing(action) => action.effects(&meta, store), P2pConnectionEffectfulAction::Incoming(action) => action.effects(&meta, store), diff --git a/p2p/testing/src/redux.rs b/p2p/testing/src/redux.rs index 674680705c..bff9051f43 100644 --- a/p2p/testing/src/redux.rs +++ b/p2p/testing/src/redux.rs @@ -13,23 +13,16 @@ use p2p::{ bootstrap::P2pNetworkKadBootstrapState, channels::{ best_tip::P2pChannelsBestTipAction, - best_tip_effectful::P2pChannelsBestTipEffectfulAction, rpc::P2pChannelsRpcAction, - rpc_effectful::P2pChannelsRpcEffectfulAction, signaling::{ discovery::P2pChannelsSignalingDiscoveryAction, - discovery_effectful::P2pChannelsSignalingDiscoveryEffectfulAction, exchange::P2pChannelsSignalingExchangeAction, - exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction, }, snark::P2pChannelsSnarkAction, - snark_effectful::P2pChannelsSnarkEffectfulAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction, - snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction, streaming_rpc::P2pChannelsStreamingRpcAction, - streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction, transaction::P2pChannelsTransactionAction, - transaction_effectful::P2pChannelsTransactionEffectfulAction, + P2pChannelsEffectfulAction, }, connection::{ incoming_effectful::P2pConnectionIncomingEffectfulAction, @@ -146,7 +139,10 @@ pub enum Action { impl From for Action { fn from(action: redux::AnyAction) -> Self { - *action.0.downcast::().expect("Downcast failed") + match action.0.downcast() { + Ok(action) => *action, + Err(action) => Self::P2p(*action.downcast().expect("Downcast failed")), + } } } @@ -317,13 +313,6 @@ impl_from_p2p!(effectful p2p::P2pNetworkPubsubEffectfulAction); impl_from_p2p!(effectful P2pNetworkIdentifyStreamEffectfulAction); impl_from_p2p!(effectful P2pConnectionOutgoingEffectfulAction); impl_from_p2p!(effectful P2pDisconnectionEffectfulAction); -impl_from_p2p!(effectful P2pChannelsSignalingDiscoveryEffectfulAction); -impl_from_p2p!(effectful P2pChannelsSignalingExchangeEffectfulAction); -impl_from_p2p!(effectful P2pChannelsBestTipEffectfulAction); -impl_from_p2p!(effectful P2pChannelsStreamingRpcEffectfulAction); -impl_from_p2p!(effectful P2pChannelsTransactionEffectfulAction); -impl_from_p2p!(effectful P2pChannelsSnarkJobCommitmentEffectfulAction); -impl_from_p2p!(effectful P2pChannelsRpcEffectfulAction); -impl_from_p2p!(effectful P2pChannelsSnarkEffectfulAction); +impl_from_p2p!(effectful P2pChannelsEffectfulAction); impl p2p::P2pActionTrait for Action {}