Skip to content

Commit d20387f

Browse files
committed
doc(p2p): Document pubsub actions and state
1 parent dd8dc07 commit d20387f

File tree

3 files changed

+141
-36
lines changed

3 files changed

+141
-36
lines changed

p2p/src/network/pubsub/p2p_network_pubsub_actions.rs

Lines changed: 77 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,70 +4,112 @@ use mina_p2p_messages::gossip::GossipNetMessageV2;
44
use openmina_core::ActionEvent;
55
use serde::{Deserialize, Serialize};
66

7+
/// Actions that can occur within the P2P Network PubSub system.
8+
///
9+
/// Managing pubsub streams, handling incoming and outgoing messages,
10+
/// and maintaining the mesh network topology.
11+
///
12+
/// **Common Fields:**
13+
/// - `peer_id`: The identifier of the peer associated with the action.
14+
/// - `addr`: The connection address of the peer.
15+
/// - `stream_id`: The unique identifier of the stream.
16+
/// - `topic_id`: The identifier of the topic involved in the action.
717
#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
818
pub enum P2pNetworkPubsubAction {
19+
/// Create a new stream, either incoming or outgoing.
20+
///
21+
/// **Fields:**
22+
/// - `incoming`: Indicates if the stream is incoming (`true`) or outgoing (`false`).
23+
/// - `protocol`: The broadcast algorithm used for the stream.
924
NewStream {
1025
incoming: bool,
1126
peer_id: PeerId,
1227
addr: ConnectionAddr,
1328
stream_id: StreamId,
1429
protocol: BroadcastAlgorithm,
1530
},
31+
32+
/// Process incoming raw data from a peer.
33+
///
34+
/// **Fields:**
35+
/// - `data`: The raw data payload received.
36+
/// - `seen_limit`: The limit for tracking seen messages to prevent duplication.
1637
IncomingData {
1738
peer_id: PeerId,
1839
addr: ConnectionAddr,
1940
stream_id: StreamId,
2041
data: Data,
2142
seen_limit: usize,
2243
},
44+
45+
/// Handle a fully decoded message received from a peer.
46+
///
47+
/// **Fields:**
48+
/// - `message`: The decoded protobuf message.
49+
/// - `seen_limit`: The limit for tracking seen messages to prevent duplication.
2350
IncomingMessage {
2451
peer_id: PeerId,
2552
message: pb::Message,
2653
seen_limit: usize,
2754
},
28-
IncomingMessageCleanup {
29-
peer_id: PeerId,
30-
},
31-
Graft {
32-
peer_id: PeerId,
33-
topic_id: String,
34-
},
35-
Prune {
36-
peer_id: PeerId,
37-
topic_id: String,
38-
},
39-
Broadcast {
40-
message: Box<GossipNetMessageV2>,
41-
},
55+
56+
/// Clean up temporary states after processing an incoming message.
57+
IncomingMessageCleanup { peer_id: PeerId },
58+
59+
/// Add a peer to the mesh network for a specific topic.
60+
Graft { peer_id: PeerId, topic_id: String },
61+
62+
/// Remove a peer from the mesh network for a specific topic.
63+
Prune { peer_id: PeerId, topic_id: String },
64+
65+
/// Initiate the broadcasting of a message to all subscribed peers.
66+
///
67+
/// **Fields:**
68+
/// - `message`: The gossip network message to broadcast.
69+
Broadcast { message: Box<GossipNetMessageV2> },
70+
71+
/// Prepare a message for signing before broadcasting.
72+
///
73+
/// **Fields:**
74+
/// - `seqno`: The sequence number of the message.
75+
/// - `author`: The identifier of the peer authoring the message.
76+
/// - `data`: The data payload of the message.
77+
/// - `topic`: The topic under which the message is published.
4278
Sign {
4379
seqno: u64,
4480
author: PeerId,
4581
data: Data,
4682
topic: String,
4783
},
84+
85+
/// An error occured during the signing process.
4886
#[action_event(level = warn, fields(display(author), display(topic)))]
49-
SignError {
50-
author: PeerId,
51-
topic: String,
52-
},
53-
BroadcastSigned {
54-
signature: Data,
55-
},
56-
OutgoingMessage {
57-
peer_id: PeerId,
58-
},
59-
OutgoingMessageClear {
60-
peer_id: PeerId,
61-
},
87+
SignError { author: PeerId, topic: String },
88+
89+
/// Finalize the broadcasting of a signed message by attaching the signature.
90+
///
91+
/// **Fields:**
92+
/// - `signature`: The cryptographic signature of the message.
93+
BroadcastSigned { signature: Data },
94+
95+
/// Prepare an outgoing message to send to a specific peer.
96+
OutgoingMessage { peer_id: PeerId },
97+
98+
/// Clear the outgoing message state for a specific peer after sending.
99+
OutgoingMessageClear { peer_id: PeerId },
100+
101+
/// An error occured during the sending of an outgoing message.
102+
///
103+
/// **Fields:**
104+
/// - `msg`: The protobuf message that failed to send.
62105
#[action_event(level = warn, fields(display(peer_id), debug(msg)))]
63-
OutgoingMessageError {
64-
msg: pb::Rpc,
65-
peer_id: PeerId,
66-
},
67-
OutgoingData {
68-
data: Data,
69-
peer_id: PeerId,
70-
},
106+
OutgoingMessageError { msg: pb::Rpc, peer_id: PeerId },
107+
108+
/// Send encoded data over an outgoing stream to a specific peer.
109+
///
110+
/// **Fields:**
111+
/// - `data`: The encoded data to be sent.
112+
OutgoingData { data: Data, peer_id: PeerId },
71113
}
72114

73115
impl From<P2pNetworkPubsubAction> for crate::P2pAction {

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ impl P2pNetworkPubsubState {
548548
}
549549

550550
/// Processes incoming data from a peer, handling subscriptions, control messages,
551-
/// and message dissemination within the P2P pubsub system.
551+
/// and message broadcasting within the P2P pubsub system.
552552
fn reduce_incoming_data(
553553
&mut self,
554554
peer_id: &PeerId,

p2p/src/network/pubsub/p2p_network_pubsub_state.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,49 @@ use serde::{Deserialize, Serialize};
1414

1515
pub const IWANT_TIMEOUT_DURATION: Duration = Duration::from_secs(5);
1616

17+
/// State of the P2P Network PubSub system.
18+
///
19+
/// This struct maintains information about connected peers, message sequencing,
20+
/// message caching, and topic subscriptions. It handles incoming and outgoing
21+
/// messages, manages the mesh network topology, and ensures efficient message
22+
/// broadcasting across the network.
1723
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
1824
pub struct P2pNetworkPubsubState {
25+
/// State of each connected peer.
1926
pub clients: BTreeMap<PeerId, P2pNetworkPubsubClientState>,
27+
28+
/// Current message sequence number.
29+
///
30+
/// Increments with each new message to ensure proper ordering and uniqueness.
2031
pub seq: u64,
32+
33+
/// Messages awaiting cryptographic signing.
2134
pub to_sign: VecDeque<pb::Message>,
35+
36+
/// Recently seen message identifiers to prevent duplication.
37+
///
38+
/// Keeps a limited history of message signatures to avoid processing
39+
/// the same message multiple times.
2240
pub seen: VecDeque<Vec<u8>>,
41+
42+
/// Cache of published messages for efficient retrieval and broadcasting.
43+
///
44+
/// For quick access and reducing redundant data transmission across peers.
2345
pub mcache: P2pNetworkPubsubMessageCache,
46+
47+
/// Incoming block from a peer, if any.
2448
pub incoming_block: Option<(PeerId, Arc<v2::MinaBlockBlockStableV2>)>,
49+
50+
/// Incoming transactions from peers along with their nonces.
2551
pub incoming_transactions: Vec<(Transaction, u32)>,
52+
53+
/// Incoming snarks from peers along with their nonces.
2654
pub incoming_snarks: Vec<(Snark, u32)>,
55+
56+
/// Topics and their subscribed peers.
2757
pub topics: BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientTopicState>>,
58+
59+
/// `iwant` requests, tracking the number of times peers have expressed interest in specific messages.
2860
pub iwant: VecDeque<P2pNetworkPubsubIwantRequestCount>,
2961
}
3062

@@ -86,14 +118,45 @@ impl P2pNetworkPubsubState {
86118
}
87119
}
88120

121+
/// State of a pubsub client connected to a peer.
122+
///
123+
/// This struct maintains essential information about the client's protocol,
124+
/// connection details, message buffers, and caching mechanisms. It facilitates
125+
/// efficient message handling and broadcasting within the pubsub system.
89126
#[derive(Serialize, Deserialize, Debug, Clone)]
90127
pub struct P2pNetworkPubsubClientState {
128+
/// Broadcast algorithm used for this client.
91129
pub protocol: BroadcastAlgorithm,
130+
131+
/// Connection address of the peer.
92132
pub addr: ConnectionAddr,
133+
134+
/// Outgoing stream identifier, if any.
135+
///
136+
/// - `Some(StreamId)`: Indicates an active outgoing stream.
137+
/// - `None`: No outgoing stream is currently established.
93138
pub outgoing_stream_id: Option<StreamId>,
139+
140+
/// Current RPC message being constructed or processed.
141+
///
142+
/// - `subscriptions`: List of subscription options for various topics.
143+
/// - `publish`: Messages queued for publishing.
144+
/// - `control`: Control commands for managing the mesh network.
94145
pub message: pb::Rpc,
146+
147+
/// Cache of recently published messages.
95148
pub cache: P2pNetworkPubsubRecentlyPublishCache,
149+
150+
/// Buffer for incoming data fragments.
151+
///
152+
/// Stores partial data received from peers, facilitating the assembly of complete
153+
/// messages when all fragments are received.
96154
pub buffer: Vec<u8>,
155+
156+
/// Collection of incoming messages from the peer.
157+
///
158+
/// Holds fully decoded `pb::Message` instances received from the peer,
159+
/// ready for further handling such as validation, caching, and broadcasting.
97160
pub incoming_messages: Vec<pb::Message>,
98161
}
99162

0 commit comments

Comments
 (0)