Skip to content

Commit 2de1d06

Browse files
committed
feat(p2p/webrtc): implement channel for exchanging relayed signaling messages
re: #772
1 parent 27d2209 commit 2de1d06

34 files changed

+995
-36
lines changed

node/common/src/service/p2p.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ use std::collections::BTreeMap;
33
use node::{
44
core::channels::mpsc,
55
event_source::Event,
6-
p2p::{connection::outgoing::P2pConnectionOutgoingInitOpts, PeerId},
6+
p2p::{
7+
connection::outgoing::P2pConnectionOutgoingInitOpts,
8+
identity::{EncryptableType, PublicKey},
9+
PeerId,
10+
},
711
};
812
use rand::prelude::*;
913
#[cfg(feature = "p2p-libp2p")]
@@ -34,6 +38,23 @@ impl webrtc::P2pServiceWebrtc for NodeService {
3438
fn peers(&mut self) -> &mut BTreeMap<PeerId, webrtc::PeerState> {
3539
&mut self.p2p.webrtc.peers
3640
}
41+
42+
fn encrypt<T: EncryptableType>(
43+
&mut self,
44+
other_pk: &PublicKey,
45+
message: &T,
46+
) -> Result<T::Encrypted, ()> {
47+
let rng = &mut self.rng;
48+
self.p2p.sec_key.encrypt(other_pk, rng, message)
49+
}
50+
51+
fn decrypt<T: EncryptableType>(
52+
&mut self,
53+
other_pk: &PublicKey,
54+
encrypted: &T::Encrypted,
55+
) -> Result<T, ()> {
56+
self.p2p.sec_key.decrypt(other_pk, encrypted)
57+
}
3758
}
3859

3960
impl webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p for NodeService {

node/src/action_kind.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use crate::p2p::channels::best_tip::P2pChannelsBestTipAction;
2828
use crate::p2p::channels::best_tip_effectful::P2pChannelsBestTipEffectfulAction;
2929
use crate::p2p::channels::rpc::P2pChannelsRpcAction;
3030
use crate::p2p::channels::rpc_effectful::P2pChannelsRpcEffectfulAction;
31+
use crate::p2p::channels::signaling::exchange::P2pChannelsSignalingExchangeAction;
32+
use crate::p2p::channels::signaling::exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction;
3133
use crate::p2p::channels::snark::P2pChannelsSnarkAction;
3234
use crate::p2p::channels::snark_effectful::P2pChannelsSnarkEffectfulAction;
3335
use crate::p2p::channels::snark_job_commitment::P2pChannelsSnarkJobCommitmentAction;
@@ -209,6 +211,21 @@ pub enum ActionKind {
209211
P2pChannelsRpcEffectfulInit,
210212
P2pChannelsRpcEffectfulRequestSend,
211213
P2pChannelsRpcEffectfulResponseSend,
214+
P2pChannelsSignalingExchangeAnswerReceived,
215+
P2pChannelsSignalingExchangeAnswerSend,
216+
P2pChannelsSignalingExchangeInit,
217+
P2pChannelsSignalingExchangeOfferDecryptError,
218+
P2pChannelsSignalingExchangeOfferDecryptSuccess,
219+
P2pChannelsSignalingExchangeOfferReceived,
220+
P2pChannelsSignalingExchangeOfferSend,
221+
P2pChannelsSignalingExchangePending,
222+
P2pChannelsSignalingExchangeReady,
223+
P2pChannelsSignalingExchangeRequestReceived,
224+
P2pChannelsSignalingExchangeRequestSend,
225+
P2pChannelsSignalingExchangeEffectfulAnswerEncryptAndSend,
226+
P2pChannelsSignalingExchangeEffectfulInit,
227+
P2pChannelsSignalingExchangeEffectfulMessageSend,
228+
P2pChannelsSignalingExchangeEffectfulOfferDecrypt,
212229
P2pChannelsSnarkInit,
213230
P2pChannelsSnarkLibp2pBroadcast,
214231
P2pChannelsSnarkLibp2pReceived,
@@ -618,7 +635,7 @@ pub enum ActionKind {
618635
}
619636

620637
impl ActionKind {
621-
pub const COUNT: u16 = 510;
638+
pub const COUNT: u16 = 525;
622639
}
623640

624641
impl std::fmt::Display for ActionKind {
@@ -1029,6 +1046,7 @@ impl ActionKindGet for P2pChannelsAction {
10291046
fn kind(&self) -> ActionKind {
10301047
match self {
10311048
Self::MessageReceived(a) => a.kind(),
1049+
Self::SignalingExchange(a) => a.kind(),
10321050
Self::BestTip(a) => a.kind(),
10331051
Self::Transaction(a) => a.kind(),
10341052
Self::Snark(a) => a.kind(),
@@ -1069,6 +1087,7 @@ impl ActionKindGet for P2pNetworkAction {
10691087
impl ActionKindGet for P2pChannelsEffectfulAction {
10701088
fn kind(&self) -> ActionKind {
10711089
match self {
1090+
Self::SignalingExchange(a) => a.kind(),
10721091
Self::BestTip(a) => a.kind(),
10731092
Self::Rpc(a) => a.kind(),
10741093
Self::Snark(a) => a.kind(),
@@ -1400,6 +1419,28 @@ impl ActionKindGet for P2pChannelsMessageReceivedAction {
14001419
}
14011420
}
14021421

1422+
impl ActionKindGet for P2pChannelsSignalingExchangeAction {
1423+
fn kind(&self) -> ActionKind {
1424+
match self {
1425+
Self::Init { .. } => ActionKind::P2pChannelsSignalingExchangeInit,
1426+
Self::Pending { .. } => ActionKind::P2pChannelsSignalingExchangePending,
1427+
Self::Ready { .. } => ActionKind::P2pChannelsSignalingExchangeReady,
1428+
Self::RequestSend { .. } => ActionKind::P2pChannelsSignalingExchangeRequestSend,
1429+
Self::OfferReceived { .. } => ActionKind::P2pChannelsSignalingExchangeOfferReceived,
1430+
Self::OfferDecryptError { .. } => {
1431+
ActionKind::P2pChannelsSignalingExchangeOfferDecryptError
1432+
}
1433+
Self::OfferDecryptSuccess { .. } => {
1434+
ActionKind::P2pChannelsSignalingExchangeOfferDecryptSuccess
1435+
}
1436+
Self::AnswerSend { .. } => ActionKind::P2pChannelsSignalingExchangeAnswerSend,
1437+
Self::RequestReceived { .. } => ActionKind::P2pChannelsSignalingExchangeRequestReceived,
1438+
Self::OfferSend { .. } => ActionKind::P2pChannelsSignalingExchangeOfferSend,
1439+
Self::AnswerReceived { .. } => ActionKind::P2pChannelsSignalingExchangeAnswerReceived,
1440+
}
1441+
}
1442+
}
1443+
14031444
impl ActionKindGet for P2pChannelsBestTipAction {
14041445
fn kind(&self) -> ActionKind {
14051446
match self {
@@ -1652,6 +1693,23 @@ impl ActionKindGet for P2pNetworkRpcAction {
16521693
}
16531694
}
16541695

1696+
impl ActionKindGet for P2pChannelsSignalingExchangeEffectfulAction {
1697+
fn kind(&self) -> ActionKind {
1698+
match self {
1699+
Self::Init { .. } => ActionKind::P2pChannelsSignalingExchangeEffectfulInit,
1700+
Self::MessageSend { .. } => {
1701+
ActionKind::P2pChannelsSignalingExchangeEffectfulMessageSend
1702+
}
1703+
Self::OfferDecrypt { .. } => {
1704+
ActionKind::P2pChannelsSignalingExchangeEffectfulOfferDecrypt
1705+
}
1706+
Self::AnswerEncryptAndSend { .. } => {
1707+
ActionKind::P2pChannelsSignalingExchangeEffectfulAnswerEncryptAndSend
1708+
}
1709+
}
1710+
}
1711+
}
1712+
16551713
impl ActionKindGet for P2pChannelsBestTipEffectfulAction {
16561714
fn kind(&self) -> ActionKind {
16571715
match self {

node/src/event_source/event_source_effects.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use p2p::channels::signaling::exchange::P2pChannelsSignalingExchangeAction;
12
use p2p::channels::snark::P2pChannelsSnarkAction;
23
use p2p::channels::streaming_rpc::P2pChannelsStreamingRpcAction;
34
use p2p::channels::transaction::P2pChannelsTransactionAction;
@@ -200,8 +201,12 @@ pub fn event_source_effects<S: Service>(store: &mut Store<S>, action: EventSourc
200201
openmina_core::log::warn!(meta.time(); kind = "P2pChannelEvent::Opened", peer_id = peer_id.to_string(), error = err);
201202
// TODO(binier): dispatch error action.
202203
}
203-
// TODO(binier): maybe dispatch success and then ready.
204204
Ok(_) => match chan_id {
205+
ChannelId::SignalingExchange => {
206+
store.dispatch(P2pChannelsSignalingExchangeAction::Ready {
207+
peer_id,
208+
});
209+
}
205210
ChannelId::BestTipPropagation => {
206211
store.dispatch(P2pChannelsBestTipAction::Ready { peer_id });
207212
}

node/src/logger/logger_effects.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ pub fn logger_effects<S: Service>(store: &Store<S>, action: ActionWithMetaRef<'_
6969
P2pAction::Identify(action) => action.action_event(&context),
7070
P2pAction::Channels(action) => match action {
7171
P2pChannelsAction::MessageReceived(action) => action.action_event(&context),
72+
P2pChannelsAction::SignalingExchange(action) => action.action_event(&context),
7273
P2pChannelsAction::BestTip(action) => action.action_event(&context),
7374
P2pChannelsAction::Transaction(action) => action.action_event(&context),
7475
P2pChannelsAction::Snark(action) => action.action_event(&context),
@@ -105,6 +106,9 @@ pub fn logger_effects<S: Service>(store: &Store<S>, action: ActionWithMetaRef<'_
105106
},
106107
Action::P2pEffectful(action) => match action {
107108
p2p::P2pEffectfulAction::Channels(action) => match action {
109+
P2pChannelsEffectfulAction::SignalingExchange(action) => {
110+
action.action_event(&context)
111+
}
108112
P2pChannelsEffectfulAction::BestTip(action) => action.action_event(&context),
109113
P2pChannelsEffectfulAction::Rpc(action) => action.action_event(&context),
110114
P2pChannelsEffectfulAction::StreamingRpc(action) => action.action_event(&context),

node/src/p2p/channels/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub use ::p2p::channels::*;
22

33
pub mod best_tip;
44
pub mod rpc;
5+
pub mod signaling;
56
pub mod snark;
67
pub mod snark_job_commitment;
78
pub mod streaming_rpc;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub use ::p2p::channels::signaling::*;
2+
3+
mod p2p_channels_signaling_exchange_actions;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
use super::exchange::*;
2+
3+
impl redux::EnablingCondition<crate::State> for P2pChannelsSignalingExchangeAction {
4+
fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool {
5+
state.p2p.is_enabled(self, time)
6+
}
7+
}

node/src/p2p/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use p2p::{
33
channels::{
44
best_tip_effectful::P2pChannelsBestTipEffectfulAction,
55
rpc_effectful::P2pChannelsRpcEffectfulAction,
6+
signaling::exchange_effectful::P2pChannelsSignalingExchangeEffectfulAction,
67
snark_effectful::P2pChannelsSnarkEffectfulAction,
78
snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction,
89
streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction,
@@ -107,6 +108,7 @@ impl_into_global_action!(network::kad::P2pNetworkKademliaAction);
107108
impl_into_global_action!(network::pubsub::P2pNetworkPubsubAction);
108109

109110
impl_into_global_action!(channels::P2pChannelsMessageReceivedAction);
111+
impl_into_global_action!(channels::signaling::exchange::P2pChannelsSignalingExchangeAction);
110112
impl_into_global_action!(channels::best_tip::P2pChannelsBestTipAction);
111113
impl_into_global_action!(channels::transaction::P2pChannelsTransactionAction);
112114
impl_into_global_action!(channels::snark::P2pChannelsSnarkAction);
@@ -132,6 +134,7 @@ impl_into_global_action!(effectful p2p::P2pNetworkPnetEffectfulAction);
132134
impl_into_global_action!(effectful connection::incoming_effectful::P2pConnectionIncomingEffectfulAction);
133135
impl_into_global_action!(effectful connection::outgoing_effectful::P2pConnectionOutgoingEffectfulAction);
134136
impl_into_global_action!(effectful p2p::disconnection_effectful::P2pDisconnectionEffectfulAction);
137+
impl_into_global_action!(effectful P2pChannelsSignalingExchangeEffectfulAction);
135138
impl_into_global_action!(effectful P2pChannelsBestTipEffectfulAction);
136139
impl_into_global_action!(effectful P2pChannelsStreamingRpcEffectfulAction);
137140
impl_into_global_action!(effectful P2pChannelsTransactionEffectfulAction);

node/testing/src/service/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,22 @@ impl P2pServiceWebrtc for NodeTestingService {
359359
fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
360360
P2pServiceWebrtc::incoming_init(&mut self.real, peer_id, offer)
361361
}
362+
363+
fn encrypt<T: node::p2p::identity::EncryptableType>(
364+
&mut self,
365+
other_pk: &node::p2p::identity::PublicKey,
366+
message: &T,
367+
) -> Result<T::Encrypted, ()> {
368+
self.real.encrypt(other_pk, message)
369+
}
370+
371+
fn decrypt<T: node::p2p::identity::EncryptableType>(
372+
&mut self,
373+
other_pub_key: &node::p2p::identity::PublicKey,
374+
encrypted: &T::Encrypted,
375+
) -> Result<T, ()> {
376+
self.real.decrypt(other_pub_key, encrypted)
377+
}
362378
}
363379

364380
impl P2pServiceWebrtcWithLibp2p for NodeTestingService {

p2p/src/channels/mod.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod best_tip;
22
pub mod best_tip_effectful;
33
pub mod rpc;
44
pub mod rpc_effectful;
5+
pub mod signaling;
56
pub mod snark;
67
pub mod snark_effectful;
78
pub mod snark_job_commitment;
@@ -26,6 +27,7 @@ use binprot::{BinProtRead, BinProtWrite};
2627
use binprot_derive::{BinProtRead, BinProtWrite};
2728
use derive_more::From;
2829
use serde::{Deserialize, Serialize};
30+
use signaling::exchange::SignalingExchangeChannelMsg;
2931
use strum_macros::EnumIter;
3032

3133
use self::best_tip::BestTipPropagationChannelMsg;
@@ -38,12 +40,13 @@ use self::transaction::TransactionPropagationChannelMsg;
3840
#[derive(Serialize, Deserialize, EnumIter, Debug, Ord, PartialOrd, Eq, PartialEq, Clone, Copy)]
3941
#[repr(u8)]
4042
pub enum ChannelId {
43+
SignalingExchange = 1,
4144
BestTipPropagation = 2,
4245
TransactionPropagation = 3,
4346
SnarkPropagation = 4,
4447
SnarkJobCommitmentPropagation = 5,
45-
Rpc = 100,
46-
StreamingRpc = 101,
48+
Rpc = 6,
49+
StreamingRpc = 7,
4750
}
4851

4952
impl ChannelId {
@@ -59,6 +62,7 @@ impl ChannelId {
5962

6063
pub fn name(self) -> &'static str {
6164
match self {
65+
Self::SignalingExchange => "signaling/exchange",
6266
Self::BestTipPropagation => "best_tip/propagation",
6367
Self::TransactionPropagation => "transaction/propagation",
6468
Self::SnarkPropagation => "snark/propagation",
@@ -70,6 +74,7 @@ impl ChannelId {
7074

7175
pub fn supported_by_libp2p(self) -> bool {
7276
match self {
77+
Self::SignalingExchange => false,
7378
Self::BestTipPropagation => true,
7479
Self::TransactionPropagation => true,
7580
Self::SnarkPropagation => true,
@@ -81,6 +86,8 @@ impl ChannelId {
8186

8287
pub fn max_msg_size(self) -> usize {
8388
match self {
89+
// TODO(binier): measure signaling message sizes
90+
Self::SignalingExchange => 16 * 1024, // 16KB
8491
// TODO(binier): reduce this value once we change message for best tip
8592
// propagation to just propagating consensus state with block hash.
8693
Self::BestTipPropagation => 32 * 1024 * 1024, // 32MB
@@ -122,6 +129,7 @@ impl MsgId {
122129

123130
#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, From, Debug, Clone)]
124131
pub enum ChannelMsg {
132+
SignalingExchange(SignalingExchangeChannelMsg),
125133
BestTipPropagation(BestTipPropagationChannelMsg),
126134
TransactionPropagation(TransactionPropagationChannelMsg),
127135
SnarkPropagation(SnarkPropagationChannelMsg),
@@ -133,6 +141,7 @@ pub enum ChannelMsg {
133141
impl ChannelMsg {
134142
pub fn channel_id(&self) -> ChannelId {
135143
match self {
144+
Self::SignalingExchange(_) => ChannelId::SignalingExchange,
136145
Self::BestTipPropagation(_) => ChannelId::BestTipPropagation,
137146
Self::TransactionPropagation(_) => ChannelId::TransactionPropagation,
138147
Self::SnarkPropagation(_) => ChannelId::SnarkPropagation,
@@ -147,6 +156,7 @@ impl ChannelMsg {
147156
W: std::io::Write,
148157
{
149158
match self {
159+
Self::SignalingExchange(v) => v.binprot_write(w),
150160
Self::BestTipPropagation(v) => v.binprot_write(w),
151161
Self::TransactionPropagation(v) => v.binprot_write(w),
152162
Self::SnarkPropagation(v) => v.binprot_write(w),
@@ -162,6 +172,9 @@ impl ChannelMsg {
162172
R: std::io::Read + ?Sized,
163173
{
164174
match id {
175+
ChannelId::SignalingExchange => {
176+
SignalingExchangeChannelMsg::binprot_read(r).map(|v| v.into())
177+
}
165178
ChannelId::BestTipPropagation => {
166179
BestTipPropagationChannelMsg::binprot_read(r).map(|v| v.into())
167180
}
@@ -194,6 +207,11 @@ impl crate::P2pState {
194207
// exhaustive matching so that we don't miss any channels.
195208
for id in self.config.enabled_channels.iter().copied() {
196209
match id {
210+
ChannelId::SignalingExchange => {
211+
dispatcher.push(
212+
signaling::exchange::P2pChannelsSignalingExchangeAction::Init { peer_id },
213+
);
214+
}
197215
ChannelId::BestTipPropagation => {
198216
dispatcher.push(best_tip::P2pChannelsBestTipAction::Init { peer_id });
199217
}

0 commit comments

Comments
 (0)