Skip to content

Commit f224013

Browse files
authored
Merge pull request #955 from openmina/tweak/pubsub
feat(p2p): More pubsub improvements, docs, cleanup
2 parents 16c8ae6 + 2dd1d47 commit f224013

File tree

6 files changed

+370
-156
lines changed

6 files changed

+370
-156
lines changed

node/src/action_kind.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,9 @@ pub enum ActionKind {
413413
P2pNetworkPubsubPrune,
414414
P2pNetworkPubsubSign,
415415
P2pNetworkPubsubSignError,
416-
P2pNetworkPubsubEffectfulIncomingData,
416+
P2pNetworkPubsubValidateIncomingMessages,
417417
P2pNetworkPubsubEffectfulSign,
418+
P2pNetworkPubsubEffectfulValidateIncomingMessages,
418419
P2pNetworkRpcHeartbeatSend,
419420
P2pNetworkRpcIncomingData,
420421
P2pNetworkRpcIncomingMessage,
@@ -688,7 +689,7 @@ pub enum ActionKind {
688689
}
689690

690691
impl ActionKind {
691-
pub const COUNT: u16 = 579;
692+
pub const COUNT: u16 = 580;
692693
}
693694

694695
impl std::fmt::Display for ActionKind {
@@ -1892,6 +1893,9 @@ impl ActionKindGet for P2pNetworkPubsubAction {
18921893
match self {
18931894
Self::NewStream { .. } => ActionKind::P2pNetworkPubsubNewStream,
18941895
Self::IncomingData { .. } => ActionKind::P2pNetworkPubsubIncomingData,
1896+
Self::ValidateIncomingMessages { .. } => {
1897+
ActionKind::P2pNetworkPubsubValidateIncomingMessages
1898+
}
18951899
Self::IncomingMessage { .. } => ActionKind::P2pNetworkPubsubIncomingMessage,
18961900
Self::IncomingMessageCleanup { .. } => {
18971901
ActionKind::P2pNetworkPubsubIncomingMessageCleanup
@@ -1994,7 +1998,9 @@ impl ActionKindGet for P2pNetworkPubsubEffectfulAction {
19941998
fn kind(&self) -> ActionKind {
19951999
match self {
19962000
Self::Sign { .. } => ActionKind::P2pNetworkPubsubEffectfulSign,
1997-
Self::IncomingData { .. } => ActionKind::P2pNetworkPubsubEffectfulIncomingData,
2001+
Self::ValidateIncomingMessages { .. } => {
2002+
ActionKind::P2pNetworkPubsubEffectfulValidateIncomingMessages
2003+
}
19982004
}
19992005
}
20002006
}

p2p/src/network/pubsub/p2p_network_pubsub_actions.rs

Lines changed: 84 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,70 +4,119 @@ use mina_p2p_messages::gossip::GossipNetMessageV2;
44
use openmina_core::ActionEvent;
55
use serde::{Deserialize, Serialize};
66

7+
/// Actions that can occur within the P2P Network PubSub system.
8+
///
9+
/// Managing pubsub streams, handling incoming and outgoing messages,
10+
/// and maintaining the mesh network topology.
11+
///
12+
/// **Common Fields:**
13+
/// - `peer_id`: The identifier of the peer associated with the action.
14+
/// - `addr`: The connection address of the peer.
15+
/// - `stream_id`: The unique identifier of the stream.
16+
/// - `topic_id`: The identifier of the topic involved in the action.
717
#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
818
pub enum P2pNetworkPubsubAction {
19+
/// Create a new stream, either incoming or outgoing.
20+
///
21+
/// **Fields:**
22+
/// - `incoming`: Indicates if the stream is incoming (`true`) or outgoing (`false`).
23+
/// - `protocol`: The broadcast algorithm used for the stream.
924
NewStream {
1025
incoming: bool,
1126
peer_id: PeerId,
1227
addr: ConnectionAddr,
1328
stream_id: StreamId,
1429
protocol: BroadcastAlgorithm,
1530
},
31+
32+
/// Process incoming raw data from a peer.
33+
///
34+
/// **Fields:**
35+
/// - `data`: The raw data payload received.
36+
/// - `seen_limit`: The limit for tracking seen messages to prevent duplication.
1637
IncomingData {
1738
peer_id: PeerId,
1839
addr: ConnectionAddr,
1940
stream_id: StreamId,
2041
data: Data,
2142
seen_limit: usize,
2243
},
23-
IncomingMessage {
44+
45+
/// Validate a batch of decoded incoming messages.
46+
ValidateIncomingMessages {
2447
peer_id: PeerId,
25-
message: pb::Message,
2648
seen_limit: usize,
49+
addr: ConnectionAddr,
2750
},
28-
IncomingMessageCleanup {
29-
peer_id: PeerId,
30-
},
31-
Graft {
32-
peer_id: PeerId,
33-
topic_id: String,
34-
},
35-
Prune {
51+
52+
/// Handle a fully decoded and validated message received from a peer.
53+
///
54+
/// **Fields:**
55+
/// - `message`: The decoded protobuf message.
56+
/// - `seen_limit`: The limit for tracking seen messages to prevent duplication.
57+
IncomingMessage {
3658
peer_id: PeerId,
37-
topic_id: String,
38-
},
39-
Broadcast {
40-
message: Box<GossipNetMessageV2>,
59+
message: pb::Message,
60+
seen_limit: usize,
4161
},
62+
63+
/// Clean up temporary states after processing an incoming message.
64+
IncomingMessageCleanup { peer_id: PeerId },
65+
66+
/// Add a peer to the mesh network for a specific topic.
67+
Graft { peer_id: PeerId, topic_id: String },
68+
69+
/// Remove a peer from the mesh network for a specific topic.
70+
Prune { peer_id: PeerId, topic_id: String },
71+
72+
/// Initiate the broadcasting of a message to all subscribed peers.
73+
///
74+
/// **Fields:**
75+
/// - `message`: The gossip network message to broadcast.
76+
Broadcast { message: Box<GossipNetMessageV2> },
77+
78+
/// Prepare a message for signing before broadcasting.
79+
///
80+
/// **Fields:**
81+
/// - `seqno`: The sequence number of the message.
82+
/// - `author`: The identifier of the peer authoring the message.
83+
/// - `data`: The data payload of the message.
84+
/// - `topic`: The topic under which the message is published.
4285
Sign {
4386
seqno: u64,
4487
author: PeerId,
4588
data: Data,
4689
topic: String,
4790
},
91+
92+
/// An error occured during the signing process.
4893
#[action_event(level = warn, fields(display(author), display(topic)))]
49-
SignError {
50-
author: PeerId,
51-
topic: String,
52-
},
53-
BroadcastSigned {
54-
signature: Data,
55-
},
56-
OutgoingMessage {
57-
peer_id: PeerId,
58-
},
59-
OutgoingMessageClear {
60-
peer_id: PeerId,
61-
},
94+
SignError { author: PeerId, topic: String },
95+
96+
/// Finalize the broadcasting of a signed message by attaching the signature.
97+
///
98+
/// **Fields:**
99+
/// - `signature`: The cryptographic signature of the message.
100+
BroadcastSigned { signature: Data },
101+
102+
/// Prepare an outgoing message to send to a specific peer.
103+
OutgoingMessage { peer_id: PeerId },
104+
105+
/// Clear the outgoing message state for a specific peer after sending.
106+
OutgoingMessageClear { peer_id: PeerId },
107+
108+
/// An error occured during the sending of an outgoing message.
109+
///
110+
/// **Fields:**
111+
/// - `msg`: The protobuf message that failed to send.
62112
#[action_event(level = warn, fields(display(peer_id), debug(msg)))]
63-
OutgoingMessageError {
64-
msg: pb::Rpc,
65-
peer_id: PeerId,
66-
},
67-
OutgoingData {
68-
data: Data,
69-
peer_id: PeerId,
70-
},
113+
OutgoingMessageError { msg: pb::Rpc, peer_id: PeerId },
114+
115+
/// Send encoded data over an outgoing stream to a specific peer.
116+
///
117+
/// **Fields:**
118+
/// - `data`: The encoded data to be sent.
119+
OutgoingData { data: Data, peer_id: PeerId },
71120
}
72121

73122
impl From<P2pNetworkPubsubAction> for crate::P2pAction {

0 commit comments

Comments
 (0)