Skip to content

Commit ebe3944

Browse files
committed
Add automated outbound pings
The first instance where I think it is appropriate to share state between reader/writer/metrics, and potentially the only instance. After integrating BIP-324, the `TimeoutParams` will become `Params`, and contain the traffic shaping profile.
1 parent 6e68e6b commit ebe3944

File tree

2 files changed

+71
-4
lines changed

2 files changed

+71
-4
lines changed

src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ pub struct ConnectionMetrics {
103103
their_preferences: Arc<Preferences>,
104104
timed_messages: Arc<Mutex<TimedMessages>>,
105105
start_time: Instant,
106+
outbound_ping_state: Arc<Mutex<OutboundPing>>,
106107
}
107108

108109
impl ConnectionMetrics {
@@ -126,6 +127,17 @@ impl ConnectionMetrics {
126127
pub fn connection_time(&self, now: Instant) -> Duration {
127128
now.duration_since(self.start_time)
128129
}
130+
131+
/// Has the connection failed to respond to a ping after the given duration.
132+
pub fn ping_timed_out(&self, timeout: Duration) -> bool {
133+
if let Ok(lock) = self.outbound_ping_state.lock() {
134+
match *lock {
135+
OutboundPing::Waiting { nonce: _, then } => return then.elapsed() > timeout,
136+
_ => return false,
137+
}
138+
}
139+
false
140+
}
129141
}
130142

131143
/// The rate at which a peer sends a particular message
@@ -239,6 +251,12 @@ impl Default for TimedMessages {
239251
}
240252
}
241253

254+
#[derive(Debug, Clone, Copy)]
255+
enum OutboundPing {
256+
Waiting { nonce: u64, then: Instant },
257+
LastReceived { then: Instant },
258+
}
259+
242260
pub trait SeedsExt {
243261
fn seeds(&self) -> Vec<&str>;
244262
}

src/net.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ use p2p::{
2121

2222
use crate::{
2323
handshake::{self, CompletedHandshake, ConnectionConfig},
24-
ConnectionMetrics, Preferences, TimedMessage, TimedMessages,
24+
ConnectionMetrics, OutboundPing, Preferences, TimedMessage, TimedMessages,
2525
};
2626

2727
pub const READ_TIMEOUT: Duration = Duration::from_secs(60);
28+
pub const PING_INTERVAL: Duration = Duration::from_secs(30);
2829

2930
/// Open or begin a connection to an inbound or outbound peer.
3031
pub trait ConnectionExt: Send + Sync {
@@ -33,6 +34,7 @@ pub trait ConnectionExt: Send + Sync {
3334
fn handshake(
3435
self,
3536
tcp_stream: TcpStream,
37+
timeout_params: TimeoutParams,
3638
) -> Result<(ConnectionWriter, ConnectionReader, ConnectionMetrics), Error>;
3739

3840
/// Listen for inbound connections on the specified socket address.
@@ -59,7 +61,7 @@ impl ConnectionExt for ConnectionConfig {
5961
let tcp_stream = TcpStream::connect(to.into())?;
6062
tcp_stream.set_read_timeout(timeout_params.read)?;
6163
tcp_stream.set_write_timeout(timeout_params.write)?;
62-
Self::handshake(self, tcp_stream)
64+
Self::handshake(self, tcp_stream, timeout_params)
6365
}
6466

6567
fn listen(
@@ -71,12 +73,13 @@ impl ConnectionExt for ConnectionConfig {
7173
let (tcp_stream, _) = listener.accept()?;
7274
tcp_stream.set_read_timeout(timeout_params.read)?;
7375
tcp_stream.set_write_timeout(timeout_params.write)?;
74-
Self::handshake(self, tcp_stream)
76+
Self::handshake(self, tcp_stream, timeout_params)
7577
}
7678

7779
fn handshake(
7880
self,
7981
mut tcp_stream: TcpStream,
82+
timeout_params: TimeoutParams,
8083
) -> Result<(ConnectionWriter, ConnectionReader, ConnectionMetrics), Error> {
8184
let system_time = SystemTime::now();
8285
let unix_time = system_time
@@ -102,6 +105,9 @@ impl ConnectionExt for ConnectionConfig {
102105
write_half.write_message(response, &mut tcp_stream)?;
103106
}
104107
let timed_messages = Arc::new(Mutex::new(TimedMessages::new()));
108+
let outbound_ping = Arc::new(Mutex::new(OutboundPing::LastReceived {
109+
then: Instant::now(),
110+
}));
105111
let CompletedHandshake {
106112
feeler,
107113
their_preferences,
@@ -111,13 +117,16 @@ impl ConnectionExt for ConnectionConfig {
111117
their_preferences: Arc::clone(&their_preferences),
112118
timed_messages: Arc::clone(&timed_messages),
113119
start_time: Instant::now(),
120+
outbound_ping_state: Arc::clone(&outbound_ping),
114121
};
115122
let (tx, rx) = mpsc::channel();
116123
let tcp_stream_clone = tcp_stream.try_clone()?;
117124
let open_writer = OpenWriter {
118125
tcp_stream: tcp_stream_clone,
119126
transport: write_half,
120127
receiver: rx,
128+
outbound_ping_state: Arc::clone(&outbound_ping),
129+
ping_interval: timeout_params.ping_interval,
121130
};
122131
let write_handle =
123132
std::thread::spawn(move || open_writer.maintain_connection());
@@ -130,6 +139,7 @@ impl ConnectionExt for ConnectionConfig {
130139
transport: read_half,
131140
their_preferences,
132141
timed_messages,
142+
outbound_ping_state: Arc::clone(&outbound_ping),
133143
};
134144
return Ok((writer, reader, live_connection));
135145
}
@@ -144,6 +154,7 @@ impl ConnectionExt for ConnectionConfig {
144154
pub struct TimeoutParams {
145155
read: Option<Duration>,
146156
write: Option<Duration>,
157+
ping_interval: Duration,
147158
}
148159

149160
impl TimeoutParams {
@@ -158,13 +169,18 @@ impl TimeoutParams {
158169
pub fn write_timeout(&mut self, timeout: Duration) {
159170
self.write = Some(timeout)
160171
}
172+
173+
pub fn ping_interval(&mut self, every: Duration) {
174+
self.ping_interval = every
175+
}
161176
}
162177

163178
impl Default for TimeoutParams {
164179
fn default() -> Self {
165180
Self {
166181
read: Some(READ_TIMEOUT),
167182
write: None,
183+
ping_interval: PING_INTERVAL,
168184
}
169185
}
170186
}
@@ -199,6 +215,8 @@ struct OpenWriter {
199215
tcp_stream: TcpStream,
200216
transport: WriteTransport,
201217
receiver: mpsc::Receiver<NetworkMessage>,
218+
outbound_ping_state: Arc<Mutex<OutboundPing>>,
219+
ping_interval: Duration,
202220
}
203221

204222
impl OpenWriter {
@@ -215,7 +233,24 @@ impl OpenWriter {
215233
_ => return Ok(()),
216234
},
217235
}
218-
// Do traffic shaping or send ping
236+
if let Ok(mut ping) = self.outbound_ping_state.lock() {
237+
match *ping {
238+
OutboundPing::LastReceived { then } => {
239+
if then.elapsed() > self.ping_interval {
240+
let nonce: u64 = random();
241+
self.transport
242+
.write_message(NetworkMessage::Ping(nonce), &mut self.tcp_stream)?;
243+
244+
*ping = OutboundPing::Waiting {
245+
nonce,
246+
then: Instant::now(),
247+
}
248+
}
249+
}
250+
OutboundPing::Waiting { nonce: _, then: _ } => continue,
251+
}
252+
}
253+
// Do traffic shaping or gossip addrs
219254
}
220255
}
221256
}
@@ -227,6 +262,7 @@ pub struct ConnectionReader {
227262
transport: ReadTransport,
228263
their_preferences: Arc<Preferences>,
229264
timed_messages: Arc<Mutex<TimedMessages>>,
265+
outbound_ping_state: Arc<Mutex<OutboundPing>>,
230266
}
231267

232268
impl ConnectionReader {
@@ -264,6 +300,19 @@ impl ConnectionReader {
264300
lock.add_many(TimedMessage::Addr, list.0.len(), Instant::now());
265301
}
266302
}
303+
NetworkMessage::Pong(pong) => {
304+
// There are bigger problems with this connection if the lock fails, so it is
305+
// okay to ignore the nonce.
306+
if let Ok(mut lock) = self.outbound_ping_state.lock() {
307+
if let OutboundPing::Waiting { nonce, then: _ } = *lock {
308+
if *pong == nonce {
309+
*lock = OutboundPing::LastReceived {
310+
then: Instant::now(),
311+
};
312+
}
313+
}
314+
}
315+
}
267316
_ => (),
268317
}
269318
}

0 commit comments

Comments
 (0)