Skip to content

Commit 878c49f

Browse files
authored
swarm/src/behaviour: Deprecate NetworkBehaviourEventProcess (#2784)
In preparation for #2751.
1 parent 0e5a25d commit 878c49f

File tree

9 files changed

+267
-164
lines changed

9 files changed

+267
-164
lines changed

examples/chat-tokio.rs

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use libp2p::{
4444
mdns::{Mdns, MdnsEvent},
4545
mplex,
4646
noise,
47-
swarm::{dial_opts::DialOpts, NetworkBehaviourEventProcess, SwarmBuilder, SwarmEvent},
47+
swarm::{SwarmBuilder, SwarmEvent},
4848
// `TokioTcpTransport` is available through the `tcp-tokio` feature.
4949
tcp::TokioTcpTransport,
5050
Multiaddr,
@@ -82,47 +82,29 @@ async fn main() -> Result<(), Box<dyn Error>> {
8282
// Create a Floodsub topic
8383
let floodsub_topic = floodsub::Topic::new("chat");
8484

85-
// We create a custom network behaviour that combines floodsub and mDNS.
86-
// The derive generates a delegating `NetworkBehaviour` impl which in turn
87-
// requires the implementations of `NetworkBehaviourEventProcess` for
88-
// the events of each behaviour.
85+
// We create a custom behaviour that combines floodsub and mDNS.
86+
// The derive generates a delegating `NetworkBehaviour` impl.
8987
#[derive(NetworkBehaviour)]
90-
#[behaviour(event_process = true)]
88+
#[behaviour(out_event = "MyBehaviourEvent")]
9189
struct MyBehaviour {
9290
floodsub: Floodsub,
9391
mdns: Mdns,
9492
}
9593

96-
impl NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour {
97-
// Called when `floodsub` produces an event.
98-
fn inject_event(&mut self, message: FloodsubEvent) {
99-
if let FloodsubEvent::Message(message) = message {
100-
println!(
101-
"Received: '{:?}' from {:?}",
102-
String::from_utf8_lossy(&message.data),
103-
message.source
104-
);
105-
}
94+
enum MyBehaviourEvent {
95+
Floodsub(FloodsubEvent),
96+
Mdns(MdnsEvent),
97+
}
98+
99+
impl From<FloodsubEvent> for MyBehaviourEvent {
100+
fn from(event: FloodsubEvent) -> Self {
101+
MyBehaviourEvent::Floodsub(event)
106102
}
107103
}
108104

109-
impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
110-
// Called when `mdns` produces an event.
111-
fn inject_event(&mut self, event: MdnsEvent) {
112-
match event {
113-
MdnsEvent::Discovered(list) => {
114-
for (peer, _) in list {
115-
self.floodsub.add_node_to_partial_view(peer);
116-
}
117-
}
118-
MdnsEvent::Expired(list) => {
119-
for (peer, _) in list {
120-
if !self.mdns.has_node(&peer) {
121-
self.floodsub.remove_node_from_partial_view(&peer);
122-
}
123-
}
124-
}
125-
}
105+
impl From<MdnsEvent> for MyBehaviourEvent {
106+
fn from(event: MdnsEvent) -> Self {
107+
MyBehaviourEvent::Mdns(event)
126108
}
127109
}
128110

@@ -166,8 +148,36 @@ async fn main() -> Result<(), Box<dyn Error>> {
166148
swarm.behaviour_mut().floodsub.publish(floodsub_topic.clone(), line.as_bytes());
167149
}
168150
event = swarm.select_next_some() => {
169-
if let SwarmEvent::NewListenAddr { address, .. } = event {
170-
println!("Listening on {:?}", address);
151+
match event {
152+
SwarmEvent::NewListenAddr { address, .. } => {
153+
println!("Listening on {:?}", address);
154+
}
155+
SwarmEvent::Behaviour(MyBehaviourEvent::Floodsub(event)) => {
156+
if let FloodsubEvent::Message(message) = event {
157+
println!(
158+
"Received: '{:?}' from {:?}",
159+
String::from_utf8_lossy(&message.data),
160+
message.source
161+
);
162+
}
163+
}
164+
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(event)) => {
165+
match event {
166+
MdnsEvent::Discovered(list) => {
167+
for (peer, _) in list {
168+
swarm.behaviour_mut().floodsub.add_node_to_partial_view(peer);
169+
}
170+
}
171+
MdnsEvent::Expired(list) => {
172+
for (peer, _) in list {
173+
if !swarm.behaviour().mdns.has_node(&peer) {
174+
swarm.behaviour_mut().floodsub.remove_node_from_partial_view(&peer);
175+
}
176+
}
177+
}
178+
}
179+
}
180+
_ => {}
171181
}
172182
}
173183
}

examples/chat.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
7979
let floodsub_topic = floodsub::Topic::new("chat");
8080

8181
// We create a custom network behaviour that combines floodsub and mDNS.
82-
// In the future, we want to improve libp2p to make this easier to do.
83-
// Use the derive to generate delegating NetworkBehaviour impl and require the
84-
// NetworkBehaviourEventProcess implementations below.
82+
// Use the derive to generate delegating NetworkBehaviour impl.
8583
#[derive(NetworkBehaviour)]
8684
#[behaviour(out_event = "OutEvent")]
8785
struct MyBehaviour {

examples/distributed-key-value-store.rs

Lines changed: 49 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use libp2p::kad::{
5050
use libp2p::{
5151
development_transport, identity,
5252
mdns::{Mdns, MdnsConfig, MdnsEvent},
53-
swarm::{NetworkBehaviourEventProcess, SwarmEvent},
53+
swarm::SwarmEvent,
5454
NetworkBehaviour, PeerId, Swarm,
5555
};
5656
use std::error::Error;
@@ -68,28 +68,60 @@ async fn main() -> Result<(), Box<dyn Error>> {
6868

6969
// We create a custom network behaviour that combines Kademlia and mDNS.
7070
#[derive(NetworkBehaviour)]
71-
#[behaviour(event_process = true)]
71+
#[behaviour(out_event = "MyBehaviourEvent")]
7272
struct MyBehaviour {
7373
kademlia: Kademlia<MemoryStore>,
7474
mdns: Mdns,
7575
}
7676

77-
impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
78-
// Called when `mdns` produces an event.
79-
fn inject_event(&mut self, event: MdnsEvent) {
80-
if let MdnsEvent::Discovered(list) = event {
81-
for (peer_id, multiaddr) in list {
82-
self.kademlia.add_address(&peer_id, multiaddr);
83-
}
84-
}
77+
enum MyBehaviourEvent {
78+
Kademlia(KademliaEvent),
79+
Mdns(MdnsEvent),
80+
}
81+
82+
impl From<KademliaEvent> for MyBehaviourEvent {
83+
fn from(event: KademliaEvent) -> Self {
84+
MyBehaviourEvent::Kademlia(event)
85+
}
86+
}
87+
88+
impl From<MdnsEvent> for MyBehaviourEvent {
89+
fn from(event: MdnsEvent) -> Self {
90+
MyBehaviourEvent::Mdns(event)
8591
}
8692
}
8793

88-
impl NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour {
89-
// Called when `kademlia` produces an event.
90-
fn inject_event(&mut self, message: KademliaEvent) {
91-
match message {
92-
KademliaEvent::OutboundQueryCompleted { result, .. } => match result {
94+
// Create a swarm to manage peers and events.
95+
let mut swarm = {
96+
// Create a Kademlia behaviour.
97+
let store = MemoryStore::new(local_peer_id);
98+
let kademlia = Kademlia::new(local_peer_id, store);
99+
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
100+
let behaviour = MyBehaviour { kademlia, mdns };
101+
Swarm::new(transport, behaviour, local_peer_id)
102+
};
103+
104+
// Read full lines from stdin
105+
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();
106+
107+
// Listen on all interfaces and whatever port the OS assigns.
108+
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
109+
110+
// Kick it off.
111+
loop {
112+
select! {
113+
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
114+
event = swarm.select_next_some() => match event {
115+
SwarmEvent::NewListenAddr { address, .. } => {
116+
println!("Listening in {:?}", address);
117+
},
118+
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(MdnsEvent::Discovered(list))) => {
119+
for (peer_id, multiaddr) in list {
120+
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
121+
}
122+
}
123+
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {
124+
match result {
93125
QueryResult::GetProviders(Ok(ok)) => {
94126
for peer in ok.providers {
95127
println!(
@@ -137,38 +169,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
137169
eprintln!("Failed to put provider record: {:?}", err);
138170
}
139171
_ => {}
140-
},
141-
_ => {}
172+
}
142173
}
174+
_ => {}
143175
}
144-
}
145-
146-
// Create a swarm to manage peers and events.
147-
let mut swarm = {
148-
// Create a Kademlia behaviour.
149-
let store = MemoryStore::new(local_peer_id);
150-
let kademlia = Kademlia::new(local_peer_id, store);
151-
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
152-
let behaviour = MyBehaviour { kademlia, mdns };
153-
Swarm::new(transport, behaviour, local_peer_id)
154-
};
155-
156-
// Read full lines from stdin
157-
let mut stdin = io::BufReader::new(io::stdin()).lines().fuse();
158-
159-
// Listen on all interfaces and whatever port the OS assigns.
160-
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
161-
162-
// Kick it off.
163-
loop {
164-
select! {
165-
line = stdin.select_next_some() => handle_input_line(&mut swarm.behaviour_mut().kademlia, line.expect("Stdin not to close")),
166-
event = swarm.select_next_some() => match event {
167-
SwarmEvent::NewListenAddr { address, .. } => {
168-
println!("Listening in {:?}", address);
169-
},
170-
_ => {}
171-
}
172176
}
173177
}
174178
}

0 commit comments

Comments
 (0)