Skip to content

Commit f58df6c

Browse files
committed
Close #529: Make peer connection flush cancellable
Investigation has shown that connection file descriptors could leak in situations where both peers disconnected with data left on the socket. While it's not entirely clear how this happened, it seems that the select was blocked while flushing. To alleviate this, make the flush itself a branch on the select, and therefore cancellable. Signed-off-by: Lee Smet <lee.smet@hotmail.com>
1 parent 78dcb1a commit f58df6c

File tree

1 file changed

+58
-38
lines changed

1 file changed

+58
-38
lines changed

mycelium/src/peer.rs

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,18 @@ impl Peer {
9292

9393
// Framed for peer
9494
// Used to send and receive packets from a TCP stream
95-
let mut framed = Framed::new(connection, packet::Codec::new());
95+
let framed = Framed::with_capacity(connection, packet::Codec::new(), 128 << 10);
96+
let (mut sink, mut stream) = framed.split();
9697

9798
{
9899
let peer = peer.clone();
99100

100101
tokio::spawn(async move {
102+
let mut needs_flush = false;
101103
loop {
102104
select! {
103105
// Received over the TCP stream
104-
frame = framed.next() => {
106+
frame = stream.next() => {
105107
match frame {
106108
Some(Ok(packet)) => {
107109
match packet {
@@ -135,60 +137,78 @@ impl Peer {
135137
}
136138
}
137139

138-
Some(packet) = from_routing_data.recv() => {
139-
if let Err(e) = framed.feed(Packet::DataPacket(packet)).await {
140-
error!("Failed to feed data packet to connection: {e}");
141-
break
142-
}
140+
rv = from_routing_data.recv(), if !needs_flush => {
141+
match rv {
142+
None => break,
143+
Some(packet) => {
144+
145+
needs_flush = true;
143146

144-
for _ in 1..PACKET_COALESCE_WINDOW {
145-
// There can be 2 cases of errors here, empty channel and no more
146-
// senders. In both cases we don't really care at this point.
147-
if let Ok(packet) = from_routing_data.try_recv() {
148-
if let Err(e) = framed.feed(Packet::DataPacket(packet)).await {
147+
if let Err(e) = sink.feed(Packet::DataPacket(packet)).await {
149148
error!("Failed to feed data packet to connection: {e}");
150149
break
151150
}
152-
trace!("Instantly queued ready packet to transfer to peer");
153-
} else {
154-
// No packets ready, flush currently buffered ones
155-
break
156-
}
157-
}
158151

159-
if let Err(e) = framed.flush().await {
160-
error!("Failed to flush buffered peer connection data packets: {e}");
161-
break
152+
153+
for _ in 1..PACKET_COALESCE_WINDOW {
154+
// There can be 2 cases of errors here, empty channel and no more
155+
// senders. In both cases we don't really care at this point.
156+
if let Ok(packet) = from_routing_data.try_recv() {
157+
if let Err(e) = sink.feed(Packet::DataPacket(packet)).await {
158+
error!("Failed to feed data packet to connection: {e}");
159+
break
160+
}
161+
trace!("Instantly queued ready packet to transfer to peer");
162+
} else {
163+
// No packets ready, flush currently buffered ones
164+
break
165+
}
166+
}
167+
}
162168
}
163169
}
164170

165-
Some(packet) = from_routing_control.recv() => {
166-
if let Err(e) = framed.feed(Packet::ControlPacket(packet)).await {
167-
error!("Failed to feed control packet to connection: {e}");
168-
break
169-
}
171+
rv = from_routing_control.recv(), if !needs_flush => {
172+
match rv {
173+
None => break,
174+
Some(packet) => {
170175

171-
for _ in 1..PACKET_COALESCE_WINDOW {
172-
// There can be 2 cases of errors here, empty channel and no more
173-
// senders. In both cases we don't really care at this point.
174-
if let Ok(packet) = from_routing_control.try_recv() {
175-
if let Err(e) = framed.feed(Packet::ControlPacket(packet)).await {
176-
error!("Failed to feed data packet to connection: {e}");
176+
needs_flush = true;
177+
178+
if let Err(e) = sink.feed(Packet::ControlPacket(packet)).await {
179+
error!("Failed to feed control packet to connection: {e}");
177180
break
178181
}
179-
} else {
180-
// No packets ready, flush currently buffered ones
181-
break
182+
183+
for _ in 1..PACKET_COALESCE_WINDOW {
184+
// There can be 2 cases of errors here, empty channel and no more
185+
// senders. In both cases we don't really care at this point.
186+
if let Ok(packet) = from_routing_control.try_recv() {
187+
if let Err(e) = sink.feed(Packet::ControlPacket(packet)).await {
188+
error!("Failed to feed data packet to connection: {e}");
189+
break
190+
}
191+
} else {
192+
// No packets ready, flush currently buffered ones
193+
break
194+
}
195+
}
182196
}
183197
}
198+
}
184199

185-
if let Err(e) = framed.flush().await {
186-
error!("Failed to flush buffered peer connection control packets: {e}");
200+
r = sink.flush(), if needs_flush => {
201+
if let Err(err) = r {
202+
error!("Failed to flush peer connection: {err}");
187203
break
188204
}
205+
needs_flush = false;
189206
}
190207

191208
_ = death_watcher.notified() => {
209+
// Attempt gracefull shutdown
210+
let mut framed = sink.reunite(stream).expect("SplitSink and SplitStream here can only be part of the same original Framned; Qed");
211+
let _ = framed.close().await;
192212
break;
193213
}
194214
}
@@ -203,7 +223,7 @@ impl Peer {
203223
error!("Peer {remote_id} could not notify router of termination: {e}");
204224
}
205225
});
206-
}
226+
};
207227

208228
Ok(peer)
209229
}

0 commit comments

Comments
 (0)