Skip to content

Commit d5a9c55

Browse files
Sergio Valverdejgraef
authored andcommitted
Add suscribe and publish methods to libp2p network
Implement Gossipsub handling
1 parent df4c838 commit d5a9c55

File tree

5 files changed

+142
-15
lines changed

5 files changed

+142
-15
lines changed

network-albatross/src/network.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,14 +429,14 @@ impl NetworkInterface for Network {
429429
unimplemented!()
430430
}
431431

432-
async fn subscribe<T>(_topic: &T) -> Box<dyn Stream<Item = (T::Item, Self::PeerType)> + Send>
432+
async fn subscribe<T>(&self, _topic: &T) -> Box<dyn Stream<Item = (T::Item, Arc<Self::PeerType>)> + Send>
433433
where
434434
T: Topic + Sync,
435435
{
436436
unimplemented!()
437437
}
438438

439-
async fn publish<T>(_topic: &T, _item: <T as Topic>::Item)
439+
async fn publish<T>(&self, _topic: &T, _item: <T as Topic>::Item) -> Result<(), Self::Error>
440440
where
441441
T: Topic + Sync,
442442
{

network-interface/src/network.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ pub enum NetworkEvent<P> {
2020
}
2121

2222
pub trait Topic {
23-
type Item: Serialize + Deserialize + Send;
23+
type Item: Serialize + Deserialize + Send + Sync + 'static;
2424

25-
fn topic(&self) -> String;
25+
fn topic(&self) -> &'static str;
2626
}
2727

2828
impl<P: Peer> std::fmt::Debug for NetworkEvent<P> {
@@ -71,11 +71,11 @@ pub trait Network: Send + Sync + 'static {
7171
ReceiveFromAll::new(self)
7272
}
7373

74-
async fn subscribe<T>(topic: &T) -> Box<dyn Stream<Item = (T::Item, Self::PeerType)> + Send>
74+
async fn subscribe<T>(&self, topic: &T) -> Box<dyn Stream<Item = (T::Item, Arc<Self::PeerType>)> + Send>
7575
where
7676
T: Topic + Sync;
7777

78-
async fn publish<T: Topic>(topic: &T, item: T::Item)
78+
async fn publish<T: Topic>(&self, topic: &T, item: T::Item) -> Result<(), Self::Error>
7979
where
8080
T: Topic + Sync;
8181

network-libp2p/src/behaviour.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::task::{Context, Poll, Waker};
44

55
use libp2p::{
66
core::either::{EitherError, EitherOutput},
7+
gossipsub::{Gossipsub, GossipsubEvent, GossipsubRpc, MessageAuthenticity},
78
kad::{handler::KademliaHandlerIn as KademliaAction, store::MemoryStore, Kademlia, KademliaEvent, QueryId},
89
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
910
NetworkBehaviour,
@@ -31,14 +32,15 @@ use crate::{
3132
};
3233

3334
pub type NimiqNetworkBehaviourAction =
34-
NetworkBehaviourAction<EitherOutput<EitherOutput<EitherOutput<DiscoveryAction, MessageAction>, LimitAction>, KademliaAction<QueryId>>, NimiqEvent>;
35+
NetworkBehaviourAction<EitherOutput<EitherOutput<EitherOutput<EitherOutput<DiscoveryAction, MessageAction>, LimitAction>, KademliaAction<QueryId>>, GossipsubRpc>, NimiqEvent>;
3536

36-
pub type NimiqNetworkBehaviourError = EitherError<EitherError<EitherError<DiscoveryError, MessageError>, LimitError>, std::io::Error>;
37+
pub type NimiqNetworkBehaviourError = EitherError<EitherError<EitherError<EitherError<DiscoveryError, MessageError>, LimitError>, std::io::Error>, std::io::Error>;
3738

3839
#[derive(Debug)]
3940
pub enum NimiqEvent {
4041
Message(NetworkEvent<Peer>),
4142
Dht(KademliaEvent),
43+
Gossip(GossipsubEvent),
4244
}
4345

4446
impl From<NetworkEvent<Peer>> for NimiqEvent {
@@ -53,13 +55,20 @@ impl From<KademliaEvent> for NimiqEvent {
5355
}
5456
}
5557

58+
impl From<GossipsubEvent> for NimiqEvent {
59+
fn from(event: GossipsubEvent) -> Self {
60+
Self::Gossip(event)
61+
}
62+
}
63+
5664
#[derive(NetworkBehaviour)]
5765
#[behaviour(out_event = "NimiqEvent", poll_method = "poll_event")]
5866
pub struct NimiqBehaviour {
5967
pub discovery: DiscoveryBehaviour,
6068
pub message: MessageBehaviour,
6169
pub limit: LimitBehaviour,
6270
pub kademlia: Kademlia<MemoryStore>,
71+
pub gossipsub: Gossipsub,
6372

6473
#[behaviour(ignore)]
6574
events: VecDeque<NimiqEvent>,
@@ -83,12 +92,14 @@ impl NimiqBehaviour {
8392

8493
let store = MemoryStore::new(peer_id.clone());
8594
let kademlia = Kademlia::with_config(peer_id, store, config.kademlia);
95+
let gossipsub = Gossipsub::new(MessageAuthenticity::Signed(config.keypair), config.gossipsub);
8696

8797
Self {
8898
discovery,
8999
message,
90100
limit,
91101
kademlia,
102+
gossipsub,
92103
events: VecDeque::new(),
93104
waker: None,
94105
}
@@ -164,3 +175,10 @@ impl NetworkBehaviourEventProcess<KademliaEvent> for NimiqBehaviour {
164175
self.emit_event(event);
165176
}
166177
}
178+
179+
impl NetworkBehaviourEventProcess<GossipsubEvent> for NimiqBehaviour {
180+
fn inject_event(&mut self, event: GossipsubEvent) {
181+
log::debug!("NimiqBehaviour::gossipsub_event: {:?}", event);
182+
self.emit_event(event);
183+
}
184+
}

network-libp2p/src/network.rs

Lines changed: 114 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use libp2p::{
1212
core,
1313
core::{muxing::StreamMuxerBox, transport::Boxed},
1414
dns,
15+
gossipsub::{GossipsubConfig, GossipsubEvent, GossipsubMessage, Topic as GossipsubTopic, TopicHash},
1516
identity::Keypair,
1617
kad::{GetRecordOk, KademliaConfig, KademliaEvent, QueryId, QueryResult, Quorum, Record},
1718
noise,
@@ -47,6 +48,7 @@ pub struct Config {
4748
pub message: MessageConfig,
4849
pub limit: LimitConfig,
4950
pub kademlia: KademliaConfig,
51+
pub gossipsub: GossipsubConfig,
5052
}
5153

5254
#[derive(Debug, Error)]
@@ -74,6 +76,9 @@ pub enum NetworkError {
7476

7577
#[error("DHT PutRecord error: {0:?}")]
7678
DhtPutRecord(libp2p::kad::PutRecordError),
79+
80+
#[error("Gossipsub Publish error: {0:?}")]
81+
GossipsubPublish(libp2p::gossipsub::error::PublishError)
7782
}
7883

7984
impl From<libp2p::kad::store::Error> for NetworkError {
@@ -94,8 +99,13 @@ impl From<libp2p::kad::PutRecordError> for NetworkError {
9499
}
95100
}
96101

97-
type NimiqSwarm = Swarm<NimiqBehaviour>;
102+
impl From<libp2p::gossipsub::error::PublishError> for NetworkError {
103+
fn from(e: libp2p::gossipsub::error::PublishError) -> Self {
104+
Self::GossipsubPublish(e)
105+
}
106+
}
98107

108+
type NimiqSwarm = Swarm<NimiqBehaviour>;
99109
#[derive(Debug)]
100110
pub enum NetworkAction {
101111
Dial {
@@ -115,12 +125,27 @@ pub enum NetworkAction {
115125
value: Vec<u8>,
116126
output: oneshot::Sender<Result<(), NetworkError>>,
117127
},
128+
RegisterTopic {
129+
topic_hash: TopicHash,
130+
output: mpsc::Sender<(GossipsubMessage, Arc<Peer>)>,
131+
},
132+
Subscribe {
133+
topic_name: &'static str,
134+
output: oneshot::Sender<TopicHash>,
135+
},
136+
Publish {
137+
topic_name: &'static str,
138+
data: Vec<u8>,
139+
output: oneshot::Sender<Result<(), NetworkError>>,
140+
},
118141
}
119142

120143
#[derive(Default)]
121144
struct TaskState {
122145
dht_puts: HashMap<QueryId, oneshot::Sender<Result<(), NetworkError>>>,
123146
dht_gets: HashMap<QueryId, oneshot::Sender<Result<Vec<u8>, NetworkError>>>,
147+
gossip_sub: HashMap<TopicHash, oneshot::Sender<TopicHash>>,
148+
gossip_topics: HashMap<TopicHash, mpsc::Sender<(GossipsubMessage, Arc<Peer>)>>,
124149
}
125150

126151
pub struct Network {
@@ -274,6 +299,30 @@ impl Network {
274299
_ => {}
275300
}
276301
}
302+
NimiqEvent::Gossip(event) => {
303+
match event {
304+
GossipsubEvent::Message(peer_id, msg_id, msg) => {
305+
log::trace!("Received message {:?} from peer {:?}: {:?}", msg_id, peer_id, msg);
306+
for topic in msg.topics.iter() {
307+
if let Some(output) = state.gossip_topics.get(&topic) {
308+
// let peer = Self::get_peer(peer_id).unwrap();
309+
// output.send((msg, peer));
310+
} else {
311+
log::warn!("Unknown topic hash: {:?}", topic);
312+
}
313+
}
314+
}
315+
GossipsubEvent::Subscribed { peer_id, topic } => {
316+
log::trace!("Peer {:?} subscribed to topic: {:?}", peer_id, topic);
317+
if let Some(output) = state.gossip_sub.remove(&topic) {
318+
output.send(topic).ok();
319+
}
320+
}
321+
GossipsubEvent::Unsubscribed { peer_id, topic } => {
322+
log::trace!("Peer {:?} unsubscribed to topic: {:?}", peer_id, topic);
323+
}
324+
}
325+
}
277326
}
278327
}
279328
_ => {}
@@ -316,6 +365,22 @@ impl Network {
316365
}
317366
}
318367
}
368+
NetworkAction::RegisterTopic { topic_hash, output } => {
369+
state.gossip_topics.insert(topic_hash, output);
370+
}
371+
NetworkAction::Subscribe { topic_name, output } => {
372+
let topic = GossipsubTopic::new(topic_name.into());
373+
if swarm.gossipsub.subscribe(topic.clone()) {
374+
state.gossip_sub.insert(topic.sha256_hash(), output);
375+
} else {
376+
log::warn!("Already subscribed to topic: {:?}", topic_name);
377+
drop(output);
378+
}
379+
}
380+
NetworkAction::Publish { topic_name, data, output } => {
381+
let topic = GossipsubTopic::new(topic_name.into());
382+
output.send(swarm.gossipsub.publish(&topic, data).map_err(Into::into)).ok();
383+
}
319384
}
320385

321386
Ok(())
@@ -344,18 +409,61 @@ impl NetworkInterface for Network {
344409
self.events_tx.subscribe()
345410
}
346411

347-
async fn subscribe<T>(_topic: &T) -> Box<dyn Stream<Item = (T::Item, Self::PeerType)> + Send>
412+
async fn subscribe<T>(&self, topic: &T) -> Box<dyn Stream<Item = (T::Item, Arc<Self::PeerType>)> + Send>
348413
where
349414
T: Topic + Sync,
350415
{
351-
unimplemented!()
416+
let (output_tx, output_rx) = oneshot::channel();
417+
418+
self.action_tx
419+
.lock()
420+
.await
421+
.send(NetworkAction::Subscribe {
422+
topic_name: topic.topic(),
423+
output: output_tx,
424+
})
425+
.await;
426+
427+
let topic_hash = output_rx.await.expect("Already subscribed to topic");
428+
let (tx, rx) = mpsc::channel(16);
429+
430+
self.action_tx
431+
.lock()
432+
.await
433+
.send(NetworkAction::RegisterTopic {
434+
topic_hash,
435+
output: tx,
436+
})
437+
.await;
438+
439+
let test = rx.map(|(msg, peer)|
440+
{
441+
let item = msg.data;
442+
( item , peer)
443+
}).into_inner();
444+
Box::new(test)
352445
}
353446

354-
async fn publish<T>(_topic: &T, _item: <T as Topic>::Item)
447+
async fn publish<T>(&self, topic: &T, item: <T as Topic>::Item) -> Result<(), Self::Error>
355448
where
356449
T: Topic + Sync,
357450
{
358-
unimplemented!()
451+
let (output_tx, output_rx) = oneshot::channel();
452+
453+
let mut buf = vec![];
454+
item.serialize(&mut buf)?;
455+
456+
self.action_tx
457+
.lock()
458+
.await
459+
.send(NetworkAction::Publish {
460+
topic_name: topic.topic(),
461+
data: buf,
462+
output: output_tx,
463+
})
464+
.await?;
465+
466+
output_rx.await?
359467
}
360468

361469
async fn dht_get<K, V>(&self, k: &K) -> Result<V, Self::Error>
@@ -482,6 +590,7 @@ mod tests {
482590
message: Default::default(),
483591
limit: Default::default(),
484592
kademlia: Default::default(),
593+
gossipsub: Default::default(),
485594
}
486595
}
487596

network-mock/src/network.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,14 +236,14 @@ impl Network for MockNetwork {
236236
self.event_tx.subscribe()
237237
}
238238

239-
async fn subscribe<T>(_topic: &T) -> Box<dyn Stream<Item = (T::Item, Self::PeerType)> + Send>
239+
async fn subscribe<T>(&self, _topic: &T) -> Box<dyn Stream<Item = (T::Item, Arc<Self::PeerType>)> + Send>
240240
where
241241
T: Topic + Sync,
242242
{
243243
unimplemented!()
244244
}
245245

246-
async fn publish<T>(_topic: &T, _item: <T as Topic>::Item)
246+
async fn publish<T>(&self, _topic: &T, _item: <T as Topic>::Item) -> Result<(), Self::Error>
247247
where
248248
T: Topic + Sync,
249249
{

0 commit comments

Comments
 (0)