@@ -20,25 +20,25 @@ mod motd;
20
20
21
21
use std:: fs:: { File , OpenOptions } ;
22
22
use std:: io:: { ErrorKind , Read , Write } ;
23
- use std:: net:: { IpAddr , SocketAddr } ;
23
+ use std:: net:: SocketAddr ;
24
24
#[ cfg( unix) ]
25
25
use std:: os:: unix:: fs:: OpenOptionsExt ;
26
26
27
27
use failure:: Error ;
28
28
use futures:: sync:: mpsc;
29
- use futures:: { future, Future , Sink , Stream } ;
29
+ use futures:: { future, Future , Stream } ;
30
30
use futures:: future:: Either ;
31
31
use itertools:: Itertools ;
32
32
use log:: LevelFilter ;
33
- use tokio:: net:: { TcpListener , UdpSocket , UdpFramed } ;
33
+ use tokio:: net:: { TcpListener , UdpSocket } ;
34
34
use tokio:: runtime;
35
35
use tox:: toxcore:: crypto_core:: * ;
36
- use tox:: toxcore:: dht:: codec:: { DecodeError , DhtCodec } ;
37
36
use tox:: toxcore:: dht:: server:: { Server as UdpServer } ;
37
+ use tox:: toxcore:: dht:: server_ext:: { ServerExt as UdpServerExt } ;
38
38
use tox:: toxcore:: dht:: lan_discovery:: LanDiscoverySender ;
39
39
use tox:: toxcore:: onion:: packet:: InnerOnionResponse ;
40
40
use tox:: toxcore:: tcp:: packet:: OnionRequest ;
41
- use tox:: toxcore:: tcp:: server:: { Server as TcpServer , ServerExt } ;
41
+ use tox:: toxcore:: tcp:: server:: { Server as TcpServer , ServerExt as TcpServerExt } ;
42
42
use tox:: toxcore:: stats:: Stats ;
43
43
#[ cfg( unix) ]
44
44
use syslog:: Facility ;
@@ -225,8 +225,6 @@ fn run_udp(cli_config: &CliConfig, dht_pk: PublicKey, dht_sk: &SecretKey, udp_on
225
225
226
226
let socket = bind_socket ( udp_addr) ;
227
227
let udp_stats = Stats :: new ( ) ;
228
- let codec = DhtCodec :: new ( udp_stats. clone ( ) ) ;
229
- let ( sink, stream) = UdpFramed :: new ( socket, codec) . split ( ) ;
230
228
231
229
// Create a channel for server to communicate with network
232
230
let ( tx, rx) = mpsc:: channel ( DHT_CHANNEL_SIZE ) ;
@@ -239,19 +237,19 @@ fn run_udp(cli_config: &CliConfig, dht_pk: PublicKey, dht_sk: &SecretKey, udp_on
239
237
Either :: B ( future:: empty ( ) )
240
238
} ;
241
239
242
- let mut server = UdpServer :: new ( tx, dht_pk, dht_sk. clone ( ) ) ;
243
- let counters = Counters :: new ( tcp_stats, udp_stats) ;
240
+ let mut udp_server = UdpServer :: new ( tx, dht_pk, dht_sk. clone ( ) ) ;
241
+ let counters = Counters :: new ( tcp_stats, udp_stats. clone ( ) ) ;
244
242
let motd = Motd :: new ( cli_config. motd . clone ( ) , counters) ;
245
- server . set_bootstrap_info ( version ( ) , Box :: new ( move |_| motd. format ( ) . as_bytes ( ) . to_owned ( ) ) ) ;
246
- server . enable_lan_discovery ( cli_config. lan_discovery_enabled ) ;
247
- server . set_tcp_onion_sink ( udp_onion. tx ) ;
248
- server . enable_ipv6_mode ( udp_addr. is_ipv6 ( ) ) ;
243
+ udp_server . set_bootstrap_info ( version ( ) , Box :: new ( move |_| motd. format ( ) . as_bytes ( ) . to_owned ( ) ) ) ;
244
+ udp_server . enable_lan_discovery ( cli_config. lan_discovery_enabled ) ;
245
+ udp_server . set_tcp_onion_sink ( udp_onion. tx ) ;
246
+ udp_server . enable_ipv6_mode ( udp_addr. is_ipv6 ( ) ) ;
249
247
250
- let server_c = server . clone ( ) ;
248
+ let udp_server_c = udp_server . clone ( ) ;
251
249
let udp_onion_future = udp_onion. rx
252
250
. map_err ( |( ) | unreachable ! ( "rx can't fail" ) )
253
251
. for_each ( move |( onion_request, addr) |
254
- server_c . handle_tcp_onion_request ( onion_request, addr) . or_else ( |err| {
252
+ udp_server_c . handle_tcp_onion_request ( onion_request, addr) . or_else ( |err| {
255
253
warn ! ( "Failed to handle TCP onion request: {:?}" , err) ;
256
254
future:: ok ( ( ) )
257
255
} )
@@ -262,50 +260,12 @@ fn run_udp(cli_config: &CliConfig, dht_pk: PublicKey, dht_sk: &SecretKey, udp_on
262
260
}
263
261
264
262
for node in cli_config. bootstrap_nodes . iter ( ) . flat_map ( |node| node. resolve ( ) ) {
265
- server . add_initial_bootstrap ( node) ;
263
+ udp_server . add_initial_bootstrap ( node) ;
266
264
}
267
265
268
- // The server task asynchronously iterates over and processes each
269
- // incoming packet.
270
- let server_c = server. clone ( ) ;
271
- let network_reader = stream. then ( future:: ok) . filter ( |event|
272
- match event {
273
- Ok ( _) => true ,
274
- Err ( ref e) => {
275
- error ! ( "packet receive error = {:?}" , e) ;
276
- // ignore packet decode errors
277
- e. as_fail ( ) . downcast_ref :: < DecodeError > ( ) . is_none ( )
278
- }
279
- }
280
- ) . and_then ( |event| event) . for_each ( move |( packet, addr) | {
281
- trace ! ( "Received packet {:?}" , packet) ;
282
- server_c. handle_packet ( packet, addr) . or_else ( |err| {
283
- error ! ( "Failed to handle packet: {:?}" , err) ;
284
- future:: ok ( ( ) )
285
- } )
286
- } ) ;
287
-
288
- let network_writer = rx
289
- . map_err ( |( ) | unreachable ! ( "rx can't fail" ) )
290
- // filter out IPv6 packets if node is running in IPv4 mode
291
- . filter ( move |& ( ref _packet, addr) | !( udp_addr. is_ipv4 ( ) && addr. is_ipv6 ( ) ) )
292
- . fold ( sink, move |sink, ( packet, mut addr) | {
293
- if udp_addr. is_ipv6 ( ) {
294
- if let IpAddr :: V4 ( ip) = addr. ip ( ) {
295
- addr = SocketAddr :: new ( IpAddr :: V6 ( ip. to_ipv6_mapped ( ) ) , addr. port ( ) ) ;
296
- }
297
- }
298
- trace ! ( "Sending packet {:?} to {:?}" , packet, addr) ;
299
- sink. send ( ( packet, addr) )
300
- } )
301
- // drop sink when rx stream is exhausted
302
- . map ( |_sink| ( ) ) ;
303
-
304
266
info ! ( "Running DHT server on {}" , udp_addr) ;
305
267
306
- Either :: B ( network_reader
307
- . select ( network_writer) . map ( |_| ( ) ) . map_err ( |( e, _) | e)
308
- . select ( server. run ( ) . map_err ( Error :: from) ) . map ( |_| ( ) ) . map_err ( |( e, _) | e)
268
+ Either :: B ( udp_server. run_socket ( socket, rx, udp_stats) . map_err ( Error :: from)
309
269
. select ( lan_discovery_future) . map ( |_| ( ) ) . map_err ( |( e, _) | e)
310
270
. join ( udp_onion_future) . map ( |_| ( ) ) )
311
271
}
0 commit comments