Skip to content

Commit 6b3500b

Browse files
ZyansheepRoman S. Borschel
andauthored
Add another chat example (but it uses tokio instead of async_std) (#1780)
* add tokio floodsub chat example * use swarmbuilder to specify tokio executor * fix comments * Tweak tokio chat example. Co-authored-by: Roman S. Borschel <[email protected]>
1 parent 3e31ea9 commit 6b3500b

File tree

2 files changed

+179
-1
lines changed

2 files changed

+179
-1
lines changed

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ libp2p-websocket = { version = "0.23.1", path = "transports/websocket", optional
9595
[dev-dependencies]
9696
async-std = "1.6.2"
9797
env_logger = "0.7.1"
98-
tokio = { version = "0.2", features = ["io-util", "io-std", "stream"] }
98+
tokio = { version = "0.2", features = ["io-util", "io-std", "stream", "macros"] }
9999

100100
[workspace]
101101
members = [
@@ -123,3 +123,7 @@ members = [
123123
"transports/websocket",
124124
"transports/wasm-ext"
125125
]
126+
127+
[[example]]
128+
name = "chat-tokio"
129+
required-features = ["tcp-tokio", "mdns-tokio"]

examples/chat-tokio.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Copyright 2018 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
//! A basic chat application demonstrating libp2p with the mDNS and floodsub protocols
22+
//! using tokio for all asynchronous tasks and I/O. In order for all used libp2p
23+
//! crates to use tokio, it enables tokio-specific features for some crates.
24+
//!
25+
//! The example is run per node as follows:
26+
//!
27+
//! ```sh
28+
//! cargo run --example chat-tokio --features="tcp-tokio mdns-tokio"
29+
//! ```
30+
//!
31+
//! Alternatively, to run with the minimal set of features and crates:
32+
//!
33+
//! ```sh
34+
//!cargo run --example chat-tokio \\
35+
//! --no-default-features \\
36+
//! --features="floodsub mplex noise tcp-tokio mdns-tokio"
37+
//! ```
38+
39+
use futures::prelude::*;
40+
use libp2p::{
41+
Multiaddr,
42+
NetworkBehaviour,
43+
PeerId,
44+
Swarm,
45+
Transport,
46+
core::upgrade,
47+
identity,
48+
floodsub::{self, Floodsub, FloodsubEvent},
49+
// `TokioMdns` is available through the `mdns-tokio` feature.
50+
mdns::{TokioMdns, MdnsEvent},
51+
mplex,
52+
noise,
53+
swarm::{NetworkBehaviourEventProcess, SwarmBuilder},
54+
// `TokioTcpConfig` is available through the `tcp-tokio` feature.
55+
tcp::TokioTcpConfig,
56+
};
57+
use std::error::Error;
58+
use tokio::io::{self, AsyncBufReadExt};
59+
60+
/// The `tokio::main` attribute sets up a tokio runtime.
61+
#[tokio::main]
62+
async fn main() -> Result<(), Box<dyn Error>> {
63+
env_logger::init();
64+
65+
// Create a random PeerId
66+
let id_keys = identity::Keypair::generate_ed25519();
67+
let peer_id = PeerId::from(id_keys.public());
68+
println!("Local peer id: {:?}", peer_id);
69+
70+
// Create a keypair for authenticated encryption of the transport.
71+
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
72+
.into_authentic(&id_keys)
73+
.expect("Signing libp2p-noise static DH keypair failed.");
74+
75+
// Create a tokio-based TCP transport use noise for authenticated
76+
// encryption and Mplex for multiplexing of substreams on a TCP stream.
77+
let transport = TokioTcpConfig::new().nodelay(true)
78+
.upgrade(upgrade::Version::V1)
79+
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
80+
.multiplex(mplex::MplexConfig::new());
81+
82+
// Create a Floodsub topic
83+
let floodsub_topic = floodsub::Topic::new("chat");
84+
85+
// We create a custom network behaviour that combines floodsub and mDNS.
86+
// The derive generates a delegating `NetworkBehaviour` impl which in turn
87+
// requires the implementations of `NetworkBehaviourEventProcess` for
88+
// the events of each behaviour.
89+
#[derive(NetworkBehaviour)]
90+
struct MyBehaviour {
91+
floodsub: Floodsub,
92+
mdns: TokioMdns,
93+
}
94+
95+
impl NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour {
96+
// Called when `floodsub` produces an event.
97+
fn inject_event(&mut self, message: FloodsubEvent) {
98+
if let FloodsubEvent::Message(message) = message {
99+
println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source);
100+
}
101+
}
102+
}
103+
104+
impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
105+
// Called when `mdns` produces an event.
106+
fn inject_event(&mut self, event: MdnsEvent) {
107+
match event {
108+
MdnsEvent::Discovered(list) =>
109+
for (peer, _) in list {
110+
self.floodsub.add_node_to_partial_view(peer);
111+
}
112+
MdnsEvent::Expired(list) =>
113+
for (peer, _) in list {
114+
if !self.mdns.has_node(&peer) {
115+
self.floodsub.remove_node_from_partial_view(&peer);
116+
}
117+
}
118+
}
119+
}
120+
}
121+
122+
// Create a Swarm to manage peers and events.
123+
let mut swarm = {
124+
let mdns = TokioMdns::new()?;
125+
let mut behaviour = MyBehaviour {
126+
floodsub: Floodsub::new(peer_id.clone()),
127+
mdns,
128+
};
129+
130+
behaviour.floodsub.subscribe(floodsub_topic.clone());
131+
132+
SwarmBuilder::new(transport, behaviour, peer_id)
133+
// We want the connection background tasks to be spawned
134+
// onto the tokio runtime.
135+
.executor(Box::new(|fut| { tokio::spawn(fut); }))
136+
.build()
137+
};
138+
139+
// Reach out to another node if specified
140+
if let Some(to_dial) = std::env::args().nth(1) {
141+
let addr: Multiaddr = to_dial.parse()?;
142+
Swarm::dial_addr(&mut swarm, addr)?;
143+
println!("Dialed {:?}", to_dial)
144+
}
145+
146+
// Read full lines from stdin
147+
let mut stdin = io::BufReader::new(io::stdin()).lines();
148+
149+
// Listen on all interfaces and whatever port the OS assigns
150+
Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?;
151+
152+
// Kick it off
153+
let mut listening = false;
154+
loop {
155+
let to_publish = {
156+
tokio::select! {
157+
line = stdin.try_next() => Some((floodsub_topic.clone(), line?.expect("Stdin closed"))),
158+
event = swarm.next() => {
159+
println!("New Event: {:?}", event);
160+
None
161+
}
162+
}
163+
};
164+
if let Some((topic, line)) = to_publish {
165+
swarm.floodsub.publish(topic, line.as_bytes());
166+
}
167+
if !listening {
168+
for addr in Swarm::listeners(&swarm) {
169+
println!("Listening on {:?}", addr);
170+
listening = true;
171+
}
172+
}
173+
}
174+
}

0 commit comments

Comments
 (0)