1
+ #![ allow( missing_docs) ]
1
2
use anyhow:: Context ;
2
3
use libp2p:: {
3
4
core:: upgrade:: Version ,
4
5
futures:: StreamExt ,
5
6
gossipsub:: { self , ValidationMode } ,
6
7
noise, ping,
7
- swarm:: { self , DialError } ,
8
+ swarm:: { self , DialError , NetworkBehaviour } ,
8
9
tcp, yamux, Multiaddr , PeerId , Swarm , Transport ,
9
10
} ;
10
11
use mithril_common:: { messages:: RegisterSignatureMessage , StdResult } ;
@@ -13,40 +14,54 @@ use std::{collections::HashMap, time::Duration};
13
14
14
15
use crate :: { PeerError , MITHRIL_SIGNATURES_TOPIC_NAME } ;
15
16
16
- // We create a custom network behaviour that combines gossipsub and ping.
17
- #[ derive( swarm :: NetworkBehaviour ) ]
17
+ /// Custom network behaviour
18
+ #[ derive( NetworkBehaviour ) ]
18
19
pub struct PeerBehaviour {
19
20
gossipsub : gossipsub:: Behaviour ,
20
21
ping : ping:: Behaviour ,
21
22
}
22
23
24
+ /// Peer event that is polled from the swarm
23
25
#[ derive( Debug ) ]
24
26
pub enum PeerEvent {
27
+ /// The peer is listening on an address
25
28
ListeningOnAddr {
29
+ /// Listening multi address
26
30
address : Multiaddr ,
27
31
} ,
32
+ /// The peer established a connection with another peer
28
33
ConnectionEstablished {
34
+ /// Remote peer id
29
35
peer_id : PeerId ,
30
36
} ,
37
+ /// The peer can not connect to another peer
31
38
OutgoingConnectionError {
39
+ /// Remote peer id
32
40
peer_id : Option < PeerId > ,
41
+ /// Error that occurred when dialing the remote peer
33
42
error : DialError ,
34
43
} ,
44
+ /// The peer received a behaviour related event
35
45
Behaviour {
46
+ /// The behaviour event the peer received
36
47
event : PeerBehaviourEvent ,
37
48
} ,
38
49
}
39
50
51
+ /// The topic name of a P2P pubsub
40
52
pub type TopicName = String ;
41
53
54
+ /// A peer in the P2P network
42
55
pub struct Peer {
43
- pub topics : HashMap < TopicName , gossipsub:: IdentTopic > ,
44
- pub swarm : Option < Swarm < PeerBehaviour > > ,
45
- pub addr : Multiaddr ,
56
+ topics : HashMap < TopicName , gossipsub:: IdentTopic > ,
57
+ swarm : Option < Swarm < PeerBehaviour > > ,
58
+ addr : Multiaddr ,
59
+ /// Multi address on which the peer is listening
46
60
pub addr_peer : Option < Multiaddr > ,
47
61
}
48
62
49
63
impl Peer {
64
+ /// Peer factory
50
65
pub fn new ( addr : & Multiaddr ) -> Self {
51
66
Self {
52
67
topics : Self :: build_topics ( ) ,
@@ -63,64 +78,7 @@ impl Peer {
63
78
) ] )
64
79
}
65
80
66
- pub fn publish_signature (
67
- & mut self ,
68
- message : & RegisterSignatureMessage ,
69
- ) -> StdResult < gossipsub:: MessageId > {
70
- let topic = self
71
- . topics
72
- . get ( MITHRIL_SIGNATURES_TOPIC_NAME )
73
- . ok_or ( PeerError :: MissingTopic ( ) )
74
- . with_context ( || "Can not publish signature on invalid topic" ) ?
75
- . to_owned ( ) ;
76
- let data = serde_json:: to_vec ( message)
77
- . with_context ( || "Can not publish signature with invalid format" ) ?;
78
-
79
- let message_id = self
80
- . swarm
81
- . as_mut ( )
82
- . map ( |swarm| swarm. behaviour_mut ( ) . gossipsub . publish ( topic, data) )
83
- . transpose ( )
84
- . with_context ( || "Can not publish signature on P2P pubsub" ) ?
85
- . ok_or ( PeerError :: UnavailableSwarm ( ) )
86
- . with_context ( || "Can not publish signature without swarm" ) ?;
87
- Ok ( message_id. to_owned ( ) )
88
- }
89
-
90
- pub async fn tick_swarm ( & mut self ) -> StdResult < Option < PeerEvent > > {
91
- debug ! ( "Peer: reading next event" ; "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
92
- match self
93
- . swarm
94
- . as_mut ( )
95
- . ok_or ( PeerError :: UnavailableSwarm ( ) )
96
- . with_context ( || "Can not publish signature without swarm" ) ?
97
- . next ( )
98
- . await
99
- {
100
- Some ( swarm:: SwarmEvent :: NewListenAddr { address, .. } ) => {
101
- debug ! ( "Peer: received listening address event" ; "address" => format!( "{address:?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
102
- Ok ( Some ( PeerEvent :: ListeningOnAddr { address } ) )
103
- }
104
- Some ( swarm:: SwarmEvent :: OutgoingConnectionError { peer_id, error, .. } ) => {
105
- debug ! ( "Peer: received outgoing connection error event" ; "error" => format!( "{error:#?}" ) , "remote_peer_id" => format!( "{peer_id:?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
106
- Ok ( Some ( PeerEvent :: OutgoingConnectionError { peer_id, error } ) )
107
- }
108
- Some ( swarm:: SwarmEvent :: ConnectionEstablished { peer_id, .. } ) => {
109
- debug ! ( "Peer: received connection established event" ; "remote_peer_id" => format!( "{peer_id:?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
110
- Ok ( Some ( PeerEvent :: ConnectionEstablished { peer_id } ) )
111
- }
112
- Some ( swarm:: SwarmEvent :: Behaviour ( event) ) => {
113
- debug ! ( "Peer: received behaviour event" ; "event" => format!( "{event:#?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
114
- Ok ( Some ( PeerEvent :: Behaviour { event } ) )
115
- }
116
- Some ( event) => {
117
- debug ! ( "Peer: received other event" ; "event" => format!( "{event:#?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
118
- Ok ( None )
119
- }
120
- _ => Ok ( None ) ,
121
- }
122
- }
123
-
81
+ /// Start the peer
124
82
pub async fn start ( mut self ) -> StdResult < Self > {
125
83
debug ! ( "Peer: starting..." ) ;
126
84
let mut swarm = libp2p:: SwarmBuilder :: with_new_identity ( )
@@ -175,13 +133,75 @@ impl Peer {
175
133
Ok ( self )
176
134
}
177
135
136
+ /// Tick the peer swarm to receive the next event
137
+ pub async fn tick_swarm ( & mut self ) -> StdResult < Option < PeerEvent > > {
138
+ debug ! ( "Peer: reading next event" ; "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
139
+ match self
140
+ . swarm
141
+ . as_mut ( )
142
+ . ok_or ( PeerError :: UnavailableSwarm ( ) )
143
+ . with_context ( || "Can not publish signature without swarm" ) ?
144
+ . next ( )
145
+ . await
146
+ {
147
+ Some ( swarm:: SwarmEvent :: NewListenAddr { address, .. } ) => {
148
+ debug ! ( "Peer: received listening address event" ; "address" => format!( "{address:?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
149
+ Ok ( Some ( PeerEvent :: ListeningOnAddr { address } ) )
150
+ }
151
+ Some ( swarm:: SwarmEvent :: OutgoingConnectionError { peer_id, error, .. } ) => {
152
+ debug ! ( "Peer: received outgoing connection error event" ; "error" => format!( "{error:#?}" ) , "remote_peer_id" => format!( "{peer_id:?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
153
+ Ok ( Some ( PeerEvent :: OutgoingConnectionError { peer_id, error } ) )
154
+ }
155
+ Some ( swarm:: SwarmEvent :: ConnectionEstablished { peer_id, .. } ) => {
156
+ debug ! ( "Peer: received connection established event" ; "remote_peer_id" => format!( "{peer_id:?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
157
+ Ok ( Some ( PeerEvent :: ConnectionEstablished { peer_id } ) )
158
+ }
159
+ Some ( swarm:: SwarmEvent :: Behaviour ( event) ) => {
160
+ debug ! ( "Peer: received behaviour event" ; "event" => format!( "{event:#?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
161
+ Ok ( Some ( PeerEvent :: Behaviour { event } ) )
162
+ }
163
+ Some ( event) => {
164
+ debug ! ( "Peer: received other event" ; "event" => format!( "{event:#?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
165
+ Ok ( None )
166
+ }
167
+ _ => Ok ( None ) ,
168
+ }
169
+ }
170
+
171
+ /// Publish a signature on the P2P pubsub
172
+ pub fn publish_signature (
173
+ & mut self ,
174
+ message : & RegisterSignatureMessage ,
175
+ ) -> StdResult < gossipsub:: MessageId > {
176
+ let topic = self
177
+ . topics
178
+ . get ( MITHRIL_SIGNATURES_TOPIC_NAME )
179
+ . ok_or ( PeerError :: MissingTopic ( ) )
180
+ . with_context ( || "Can not publish signature on invalid topic" ) ?
181
+ . to_owned ( ) ;
182
+ let data = serde_json:: to_vec ( message)
183
+ . with_context ( || "Can not publish signature with invalid format" ) ?;
184
+
185
+ let message_id = self
186
+ . swarm
187
+ . as_mut ( )
188
+ . map ( |swarm| swarm. behaviour_mut ( ) . gossipsub . publish ( topic, data) )
189
+ . transpose ( )
190
+ . with_context ( || "Can not publish signature on P2P pubsub" ) ?
191
+ . ok_or ( PeerError :: UnavailableSwarm ( ) )
192
+ . with_context ( || "Can not publish signature without swarm" ) ?;
193
+ Ok ( message_id. to_owned ( ) )
194
+ }
195
+
196
+ /// Connect to a remote peer
178
197
pub fn dial ( & mut self , addr : Multiaddr ) -> StdResult < ( ) > {
179
198
debug ! ( "Peer: dialing to" ; "address" => format!( "{addr:?}" ) , "local_peer_id" => format!( "{:?}" , self . local_peer_id( ) ) ) ;
180
199
self . swarm . as_mut ( ) . unwrap ( ) . dial ( addr) ?;
181
200
182
201
Ok ( ( ) )
183
202
}
184
203
204
+ /// Get the local peer id (if any)
185
205
pub fn local_peer_id ( & self ) -> Option < PeerId > {
186
206
self . swarm . as_ref ( ) . map ( |s| s. local_peer_id ( ) . to_owned ( ) )
187
207
}
0 commit comments