Skip to content

Commit 6239f34

Browse files
pinger: fix memory leak
If a Pinger never received any packets with the correct identifier it would never read from the round_receiver channel, making it slowly grow forever.
1 parent 726c3d1 commit 6239f34

File tree

1 file changed

+101
-65
lines changed

1 file changed

+101
-65
lines changed

src/pinger.rs

Lines changed: 101 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -94,80 +94,116 @@ impl<V: IpVersion> Pinger<V> {
9494
u16,
9595
mpsc::UnboundedSender<(V, Instant, Instant)>,
9696
> = HashMap::new();
97-
'packets: while let Ok(tuple) = raw_blocking.recv(&mut buf) {
98-
let packet = match tuple {
99-
Some(packet) if packet.identifier() == identifier => packet,
100-
_ => continue 'packets,
101-
};
102-
103-
let recv_instant = Instant::now();
104-
105-
#[cfg(not(feature = "strong"))]
106-
let send_instant = {
107-
let payload = packet.payload();
108-
match Instant::decode(payload[..Instant::ENCODED_LEN].try_into().unwrap()) {
109-
Some(send_instant) => send_instant,
110-
None => continue 'packets,
111-
}
112-
};
113-
114-
let packet_source = packet.source();
115-
let packet_sequence_number = packet.sequence_number();
116-
match subscribers.get(&packet_sequence_number) {
117-
Some(subscriber) => {
118-
#[cfg(feature = "strong")]
119-
if subscriber.send((packet_source, recv_instant)).is_err() {
120-
// Closed
121-
subscribers.remove(&packet_sequence_number);
122-
}
97+
'packets: while let Ok(maybe_packet) = raw_blocking.recv(&mut buf) {
98+
match &maybe_packet {
99+
Some(packet) if packet.identifier() == identifier => {
100+
let recv_instant = Instant::now();
123101

124102
#[cfg(not(feature = "strong"))]
125-
if subscriber
126-
.send((packet_source, send_instant, recv_instant))
127-
.is_err()
128-
{
129-
// Closed
130-
subscribers.remove(&packet_sequence_number);
131-
}
132-
}
133-
None => 'registrations: loop {
134-
match receiver.try_recv() {
135-
Ok(RoundMessage::Subscribe {
136-
sequence_number,
137-
sender,
138-
}) => {
139-
if packet_sequence_number == sequence_number {
140-
// Packet matches
141-
142-
#[cfg(feature = "strong")]
143-
if sender.send((packet_source, recv_instant)).is_err() {
144-
// Closed
145-
continue 'registrations;
146-
}
103+
let send_instant = {
104+
let payload = packet.payload();
105+
match Instant::decode(
106+
payload[..Instant::ENCODED_LEN].try_into().unwrap(),
107+
) {
108+
Some(send_instant) => send_instant,
109+
None => continue 'packets,
110+
}
111+
};
112+
113+
let packet_source = packet.source();
114+
let packet_sequence_number = packet.sequence_number();
115+
match subscribers.get(&packet_sequence_number) {
116+
Some(subscriber) => {
117+
#[cfg(feature = "strong")]
118+
if subscriber.send((packet_source, recv_instant)).is_err() {
119+
// Closed
120+
subscribers.remove(&packet_sequence_number);
121+
}
147122

148-
#[cfg(not(feature = "strong"))]
149-
if sender
150-
.send((packet_source, send_instant, recv_instant))
151-
.is_err()
152-
{
153-
// Closed
154-
continue 'registrations;
155-
}
123+
#[cfg(not(feature = "strong"))]
124+
if subscriber
125+
.send((packet_source, send_instant, recv_instant))
126+
.is_err()
127+
{
128+
// Closed
129+
subscribers.remove(&packet_sequence_number);
156130
}
157131

158-
subscribers.insert(sequence_number, sender);
132+
continue 'packets;
159133
}
160-
Ok(RoundMessage::Unsubscribe { sequence_number }) => {
161-
drop(subscribers.remove(&sequence_number));
162-
}
163-
Err(TryRecvError::Empty) => {
164-
break 'registrations;
134+
None => {
135+
// TODO: fix this duplication
136+
'registrations: loop {
137+
match receiver.try_recv() {
138+
Ok(RoundMessage::Subscribe {
139+
sequence_number,
140+
sender,
141+
}) => {
142+
if packet_sequence_number == sequence_number {
143+
// Packet matches
144+
145+
#[cfg(feature = "strong")]
146+
if sender
147+
.send((packet_source, recv_instant))
148+
.is_err()
149+
{
150+
// Closed
151+
continue 'registrations;
152+
}
153+
154+
#[cfg(not(feature = "strong"))]
155+
if sender
156+
.send((
157+
packet_source,
158+
send_instant,
159+
recv_instant,
160+
))
161+
.is_err()
162+
{
163+
// Closed
164+
continue 'registrations;
165+
}
166+
}
167+
168+
subscribers.insert(sequence_number, sender);
169+
}
170+
Ok(RoundMessage::Unsubscribe { sequence_number }) => {
171+
drop(subscribers.remove(&sequence_number));
172+
}
173+
Err(TryRecvError::Empty) => {
174+
break 'registrations;
175+
}
176+
Err(TryRecvError::Disconnected) => {
177+
break 'packets;
178+
}
179+
}
180+
}
165181
}
166-
Err(TryRecvError::Disconnected) => {
167-
break 'packets;
182+
}
183+
}
184+
Some(_packet) => {}
185+
None => {
186+
// TODO: fix this duplication
187+
'registrations: loop {
188+
match receiver.try_recv() {
189+
Ok(RoundMessage::Subscribe {
190+
sequence_number,
191+
sender,
192+
}) => {
193+
subscribers.insert(sequence_number, sender);
194+
}
195+
Ok(RoundMessage::Unsubscribe { sequence_number }) => {
196+
drop(subscribers.remove(&sequence_number));
197+
}
198+
Err(TryRecvError::Empty) => {
199+
break 'registrations;
200+
}
201+
Err(TryRecvError::Disconnected) => {
202+
break 'packets;
203+
}
168204
}
169205
}
170-
},
206+
}
171207
}
172208
}
173209
});

0 commit comments

Comments
 (0)