Skip to content
This repository was archived by the owner on Dec 10, 2022. It is now read-only.

Commit 7a66862

Browse files
committed
refactor(node): use DHT ServerExt
1 parent 10a4ef2 commit 7a66862

File tree

2 files changed

+8
-48
lines changed

2 files changed

+8
-48
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/main.rs

Lines changed: 7 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,25 @@ mod motd;
2020

2121
use std::fs::{File, OpenOptions};
2222
use std::io::{ErrorKind, Read, Write};
23-
use std::net::{IpAddr, SocketAddr};
23+
use std::net::SocketAddr;
2424
#[cfg(unix)]
2525
use std::os::unix::fs::OpenOptionsExt;
2626

2727
use failure::Error;
2828
use futures::sync::mpsc;
29-
use futures::{future, Future, Sink, Stream};
29+
use futures::{future, Future, Stream};
3030
use futures::future::Either;
3131
use itertools::Itertools;
3232
use log::LevelFilter;
33-
use tokio::net::{TcpListener, UdpSocket, UdpFramed};
33+
use tokio::net::{TcpListener, UdpSocket};
3434
use tokio::runtime;
3535
use tox::toxcore::crypto_core::*;
36-
use tox::toxcore::dht::codec::{DecodeError, DhtCodec};
3736
use tox::toxcore::dht::server::{Server as UdpServer};
37+
use tox::toxcore::dht::server_ext::{ServerExt as UdpServerExt};
3838
use tox::toxcore::dht::lan_discovery::LanDiscoverySender;
3939
use tox::toxcore::onion::packet::InnerOnionResponse;
4040
use tox::toxcore::tcp::packet::OnionRequest;
41-
use tox::toxcore::tcp::server::{Server as TcpServer, ServerExt};
41+
use tox::toxcore::tcp::server::{Server as TcpServer, ServerExt as TcpServerExt};
4242
use tox::toxcore::stats::Stats;
4343
#[cfg(unix)]
4444
use syslog::Facility;
@@ -225,8 +225,6 @@ fn run_udp(cli_config: &CliConfig, dht_pk: PublicKey, dht_sk: &SecretKey, udp_on
225225

226226
let socket = bind_socket(udp_addr);
227227
let udp_stats = Stats::new();
228-
let codec = DhtCodec::new(udp_stats.clone());
229-
let (sink, stream) = UdpFramed::new(socket, codec).split();
230228

231229
// Create a channel for server to communicate with network
232230
let (tx, rx) = mpsc::channel(DHT_CHANNEL_SIZE);
@@ -240,7 +238,7 @@ fn run_udp(cli_config: &CliConfig, dht_pk: PublicKey, dht_sk: &SecretKey, udp_on
240238
};
241239

242240
let mut server = UdpServer::new(tx, dht_pk, dht_sk.clone());
243-
let counters = Counters::new(tcp_stats, udp_stats);
241+
let counters = Counters::new(tcp_stats, udp_stats.clone());
244242
let motd = Motd::new(cli_config.motd.clone(), counters);
245243
server.set_bootstrap_info(version(), Box::new(move |_| motd.format().as_bytes().to_owned()));
246244
server.enable_lan_discovery(cli_config.lan_discovery_enabled);
@@ -265,47 +263,9 @@ fn run_udp(cli_config: &CliConfig, dht_pk: PublicKey, dht_sk: &SecretKey, udp_on
265263
server.add_initial_bootstrap(node);
266264
}
267265

268-
// The server task asynchronously iterates over and processes each
269-
// incoming packet.
270-
let server_c = server.clone();
271-
let network_reader = stream.then(future::ok).filter(|event|
272-
match event {
273-
Ok(_) => true,
274-
Err(ref e) => {
275-
error!("packet receive error = {:?}", e);
276-
// ignore packet decode errors
277-
e.as_fail().downcast_ref::<DecodeError>().is_none()
278-
}
279-
}
280-
).and_then(|event| event).for_each(move |(packet, addr)| {
281-
trace!("Received packet {:?}", packet);
282-
server_c.handle_packet(packet, addr).or_else(|err| {
283-
error!("Failed to handle packet: {:?}", err);
284-
future::ok(())
285-
})
286-
});
287-
288-
let network_writer = rx
289-
.map_err(|()| unreachable!("rx can't fail"))
290-
// filter out IPv6 packets if node is running in IPv4 mode
291-
.filter(move |&(ref _packet, addr)| !(udp_addr.is_ipv4() && addr.is_ipv6()))
292-
.fold(sink, move |sink, (packet, mut addr)| {
293-
if udp_addr.is_ipv6() {
294-
if let IpAddr::V4(ip) = addr.ip() {
295-
addr = SocketAddr::new(IpAddr::V6(ip.to_ipv6_mapped()), addr.port());
296-
}
297-
}
298-
trace!("Sending packet {:?} to {:?}", packet, addr);
299-
sink.send((packet, addr))
300-
})
301-
// drop sink when rx stream is exhausted
302-
.map(|_sink| ());
303-
304266
info!("Running DHT server on {}", udp_addr);
305267

306-
Either::B(network_reader
307-
.select(network_writer).map(|_| ()).map_err(|(e, _)| e)
308-
.select(server.run().map_err(Error::from)).map(|_| ()).map_err(|(e, _)| e)
268+
Either::B(server.run_socket(socket, rx, udp_stats).map_err(Error::from)
309269
.select(lan_discovery_future).map(|_| ()).map_err(|(e, _)| e)
310270
.join(udp_onion_future).map(|_| ()))
311271
}

0 commit comments

Comments
 (0)