@@ -28,85 +28,80 @@ pub fn channel() -> (GossipSender, GossipReceiver) {
2828 ( sender, UnboundedReceiverStream :: new ( receiver) )
2929}
3030
31- pub fn start_incoming_processor (
31+ pub fn start_inbound_gossip_handler (
3232 peer_id : PeerId ,
33- mut reader : BufReader < ReadHalf < Box < NegotiatedSubstream > > > ,
34- incoming_tx : GossipSender ,
35- internal_event_sender : InternalEventSender ,
33+ mut inbound_gossip_rx : BufReader < ReadHalf < Box < NegotiatedSubstream > > > ,
34+ inbound_gossip_tx : GossipSender ,
35+ internal_event_tx : InternalEventSender ,
3636) {
3737 tokio:: spawn ( async move {
38- let mut msg_buf = vec ! [ 0u8 ; MSG_BUFFER_LEN ] ;
38+ let mut buf = vec ! [ 0u8 ; MSG_BUFFER_LEN ] ;
3939
4040 loop {
41- if let Some ( len) = ( & mut reader) . read ( & mut msg_buf) . await . ok ( ) . filter ( |len| * len > 0 ) {
42- if incoming_tx. send ( msg_buf[ ..len] . to_vec ( ) ) . is_err ( ) {
43- debug ! ( "gossip-in: receiver dropped locally." ) ;
41+ if let Some ( len) = ( & mut inbound_gossip_rx)
42+ . read ( & mut buf)
43+ . await
44+ . ok ( )
45+ . filter ( |len| * len > 0 )
46+ {
47+ if inbound_gossip_tx. send ( buf[ ..len] . to_vec ( ) ) . is_err ( ) {
48+ debug ! ( "Terminating gossip protocol with {}." , alias!( peer_id) ) ;
4449
45- // The receiver of this channel was dropped, maybe due to a shutdown. There is nothing we can do
46- // to salvage this situation, hence we drop the connection.
4750 break ;
4851 }
4952 } else {
50- debug ! ( "gossip-in: stream closed remotely." ) ;
53+ debug ! ( "Peer {} terminated gossip protocol." , alias! ( peer_id ) ) ;
5154
52- // NB: The network service will not shut down before it has received the `ProtocolDropped` event
53- // from all once connected peers, hence if the following send fails, then it
54- // must be considered a bug.
55+ // Panic: we made sure that the sender (network host) is always dropped before the receiver (service
56+ // host) through the worker dependencies, hence this can never panic.
57+ internal_event_tx
58+ . send ( InternalEvent :: ProtocolStopped { peer_id } )
59+ . expect ( "send internal event" ) ;
5560
5661 break ;
5762 }
5863 }
5964
60- // Ignore send errors.
61- let _ = internal_event_sender. send ( InternalEvent :: ProtocolStopped { peer_id } ) ;
62-
63- // Reasons why this task might end:
64- // (1) The remote dropped the TCP connection.
65- // (2) The local dropped the gossip_in receiver channel.
66-
67- debug ! ( "gossip-in: exiting gossip-in processor for {}." , alias!( peer_id) ) ;
65+ trace ! ( "Dropping gossip stream reader for {}." , alias!( peer_id) ) ;
6866 } ) ;
6967}
7068
71- pub fn start_outgoing_processor (
69+ pub fn start_outbound_gossip_handler (
7270 peer_id : PeerId ,
73- mut writer : BufWriter < WriteHalf < Box < NegotiatedSubstream > > > ,
74- outgoing_rx : GossipReceiver ,
75- internal_event_sender : InternalEventSender ,
71+ mut outbound_gossip_tx : BufWriter < WriteHalf < Box < NegotiatedSubstream > > > ,
72+ outbound_gossip_rx : GossipReceiver ,
73+ internal_event_tx : InternalEventSender ,
7674) {
7775 tokio:: spawn ( async move {
78- let mut outgoing_gossip_receiver = outgoing_rx . fuse ( ) ;
76+ let mut outbound_gossip_rx = outbound_gossip_rx . fuse ( ) ;
7977
8078 // If the gossip sender dropped we end the connection.
81- while let Some ( message) = outgoing_gossip_receiver . next ( ) . await {
82- // NB : Instead of polling another shutdown channel, we use an empty message
79+ while let Some ( message) = outbound_gossip_rx . next ( ) . await {
80+ // Note : Instead of polling another shutdown channel, we use an empty message
8381 // to signal that we want to end the connection. We use this "trick" whenever the network
8482 // receives the `DisconnectPeer` command to enforce that the connection will be dropped.
85-
8683 if message. is_empty ( ) {
87- debug ! ( "gossip-out: received shutdown message." ) ;
84+ debug ! (
85+ "Terminating gossip protocol with {} (received shutdown signal)." ,
86+ alias!( peer_id)
87+ ) ;
88+
89+ // Panic: we made sure that the sender (network host) is always dropped before the receiver (service
90+ // host) through the worker dependencies, hence this can never panic.
91+ internal_event_tx
92+ . send ( InternalEvent :: ProtocolStopped { peer_id } )
93+ . expect ( "send internal event" ) ;
8894
89- // NB: The network service will not shut down before it has received the `ConnectionDropped` event
90- // from all once connected peers, hence if the following send fails, then it
91- // must be considered a bug.
9295 break ;
93- }
96+ } else if ( & mut outbound_gossip_tx) . write_all ( & message) . await . is_err ( )
97+ || ( & mut outbound_gossip_tx) . flush ( ) . await . is_err ( )
98+ {
99+ debug ! ( "Peer {} terminated gossip protocol." , alias!( peer_id) ) ;
94100
95- // If sending to the stream fails we end the connection.
96- // TODO: buffer for x milliseconds before flushing.
97- if ( & mut writer) . write_all ( & message) . await . is_err ( ) || ( & mut writer) . flush ( ) . await . is_err ( ) {
98- debug ! ( "gossip-out: stream closed remotely" ) ;
99101 break ;
100102 }
101103 }
102104
103- // Ignore send errors.
104- let _ = internal_event_sender. send ( InternalEvent :: ProtocolStopped { peer_id } ) ;
105-
106- // Reasons why this task might end:
107- // (1) The local send the shutdown message (len = 0)
108- // (2) The remote dropped the TCP connection.
109-
110- debug ! ( "gossip-out: exiting gossip-out processor for {}." , alias!( peer_id) ) ;
105+ trace ! ( "Dropping gossip stream writer for {}." , alias!( peer_id) ) ;
111106 } ) ;
112107}
0 commit comments