Skip to content

Commit ed5cde7

Browse files
committed
Handle timeouts during established connection.
I think we still need to ensure that we can timeout during the handshakes.
1 parent 44e1991 commit ed5cde7

File tree

11 files changed

+562
-316
lines changed

11 files changed

+562
-316
lines changed

src/config_cli.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use std::time::Duration;
22

3+
use crate::wire_config::{MIN_DEFAULT_TIMEOUT, MIN_DEFAULT_TIMEOUT_PACKETS};
4+
35
use clap::{Arg, ArgAction, ArgGroup, ArgMatches, Command, value_parser};
46
use declarative_enum_dispatch::enum_dispatch;
7+
use humantime::format_duration;
58

69
const DEFAULT_TUN_NAME: &str = "tun-i405";
710
// TODO ensure there are no issues when this is greater than the inter-packet interval
@@ -30,9 +33,11 @@ pub(crate) struct WireConfigCli {
3033
pub(crate) outgoing_packet_length: Option<u64>,
3134
pub(crate) outgoing_finalize_delta: u64,
3235
pub(crate) outgoing_interval: WireIntervalCli,
36+
pub(crate) client_timeout: Option<Duration>,
3337
pub(crate) incoming_packet_length: Option<u64>,
3438
pub(crate) incoming_finalize_delta: u64,
3539
pub(crate) incoming_interval: WireIntervalCli,
40+
pub(crate) server_timeout: Option<Duration>,
3641
}
3742

3843
impl EzClap for WireConfigCli {
@@ -64,6 +69,10 @@ impl EzClap for WireConfigCli {
6469
.default_value(DEFAULT_FINALIZE_DELTA)
6570
.value_parser(humantime::parse_duration)
6671
.help("How long to finalize the contents of a packet to be sent before actually sending it. If too short, then packets may frequently be delayed and not sent at the intended times. This is most likely to happen under system load, and those delayed packets could indicate to an attacker that your system is under load. Warnings are logged whenever this happens, so increase this value if you see those warnings frequently."),
72+
Arg::new("client_timeout")
73+
.long("client-timeout")
74+
.value_parser(humantime::parse_duration)
75+
.help(format!("The client will disconnect if no packet is received from the server within this amount of time. Defaults to max({}, {MIN_DEFAULT_TIMEOUT_PACKETS}*incoming_packet_interval), ie, will disconnect after at least 10 seconds have elapsed and at least {MIN_DEFAULT_TIMEOUT_PACKETS} packets are dropped/delayed.", format_duration(MIN_DEFAULT_TIMEOUT))),
6776
Arg::new("incoming_packet_length")
6877
.long("incoming-packet-length")
6978
.visible_alias("down-packet-length")
@@ -91,6 +100,10 @@ impl EzClap for WireConfigCli {
91100
// TODO make sure we print warnings for delayed finalization on the server, on the
92101
// client (via statistics).
93102
.help("See the documentation for --outgoing-finalize-delta"),
103+
Arg::new("server_timeout")
104+
.long("server-timeout")
105+
.value_parser(humantime::parse_duration)
106+
.help("See --client-timeout"),
94107
]
95108
}
96109

@@ -148,6 +161,7 @@ impl EzClap for WireConfigCli {
148161
.as_nanos()
149162
.try_into()
150163
.unwrap(),
164+
client_timeout: matches.get_one::<Duration>("client_timeout").cloned(),
151165
incoming_packet_length: matches
152166
.get_one::<bytesize::ByteSize>("incoming_packet_length")
153167
.map(|x| x.as_u64()),
@@ -158,6 +172,7 @@ impl EzClap for WireConfigCli {
158172
.as_nanos()
159173
.try_into()
160174
.unwrap(),
175+
server_timeout: matches.get_one::<Duration>("server_timeout").cloned(),
161176
}
162177
}
163178
}

src/core/client.rs

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -263,15 +263,16 @@ impl C2SHandshakeSent {
263263

264264
fn send_one_handshake(&mut self, config: &Config, hardware: &mut impl Hardware) -> Result<()> {
265265
let mut builder = messages::PacketBuilder::new(
266-
ip_to_i405_length(config.c2s_wire_config.packet_length, config.peer_address).into(),
266+
ip_to_i405_length(config.client_wire_config.packet_length, config.peer_address).into(),
267267
);
268268
let c2s_handshake = messages::ClientToServerHandshake {
269269
protocol_version: PROTOCOL_VERSION,
270270
oldest_compatible_protocol_version: OLDEST_COMPATIBLE_PROTOCOL_VERSION,
271-
s2c_packet_length: config.s2c_wire_config.packet_length,
272-
s2c_packet_interval_min: config.s2c_wire_config.packet_interval_min,
273-
s2c_packet_interval_max: config.s2c_wire_config.packet_interval_max,
274-
s2c_packet_finalize_delta: config.s2c_wire_config.packet_finalize_delta,
271+
s2c_packet_length: config.server_wire_config.packet_length,
272+
s2c_packet_interval_min: config.server_wire_config.packet_interval_min,
273+
s2c_packet_interval_max: config.server_wire_config.packet_interval_max,
274+
s2c_packet_finalize_delta: config.server_wire_config.packet_finalize_delta,
275+
server_timeout: config.server_wire_config.timeout,
275276
};
276277
let did_add =
277278
builder.try_add_message(&messages::Message::ClientToServerHandshake(c2s_handshake));
@@ -393,7 +394,7 @@ impl ConnectionStateTrait for C2SHandshakeSent {
393394
hardware,
394395
self.session,
395396
config.peer_address,
396-
config.c2s_wire_config.clone(),
397+
config.client_wire_config.clone(),
397398
)?,
398399
))
399400
}
@@ -412,15 +413,46 @@ impl ConnectionStateTrait for C2SHandshakeSent {
412413
}
413414
}
414415

416+
impl EstablishedConnection {
417+
// stupid naming _s because enum_dispatch won't let us use the same name here and in server
418+
fn handle_is_connection_open_c(
419+
self,
420+
config: &Config,
421+
hardware: &mut impl Hardware,
422+
is_connection_open: IsConnectionOpen,
423+
) -> Result<ConnectionState> {
424+
match is_connection_open {
425+
IsConnectionOpen::Yes => Ok(ConnectionState::EstablishedConnection(self)),
426+
IsConnectionOpen::TimedOut => {
427+
log::warn!(
428+
"Received no packets from server in a while -- returning to NoConnection state"
429+
);
430+
Ok(ConnectionState::NoConnection(NoConnection::new(
431+
config, hardware,
432+
)?))
433+
}
434+
IsConnectionOpen::TerminatedNormally => {
435+
log::info!(
436+
"Client terminated connection normally -- returning to NoConnection state"
437+
);
438+
Ok(ConnectionState::NoConnection(NoConnection::new(
439+
config, hardware,
440+
)?))
441+
}
442+
}
443+
}
444+
}
445+
415446
impl ConnectionStateTrait for EstablishedConnection {
416447
fn on_timer(
417448
mut self,
418-
_config: &Config,
449+
config: &Config,
419450
hardware: &mut impl Hardware,
420451
timer_timestamp: u64,
421452
) -> Result<ConnectionState> {
422-
EstablishedConnection::on_timer(&mut self, hardware, timer_timestamp)?;
423-
Ok(ConnectionState::EstablishedConnection(self))
453+
let is_connection_open =
454+
EstablishedConnection::on_timer(&mut self, hardware, timer_timestamp)?;
455+
return self.handle_is_connection_open_c(config, hardware, is_connection_open);
424456
}
425457

426458
fn on_read_outgoing_packet(
@@ -440,19 +472,9 @@ impl ConnectionStateTrait for EstablishedConnection {
440472
hardware: &mut impl Hardware,
441473
packet: &[u8],
442474
) -> Result<ConnectionState> {
443-
match EstablishedConnection::on_read_incoming_packet(&mut self, hardware, packet)? {
444-
IsConnectionOpen::Yes => Ok(ConnectionState::EstablishedConnection(self)),
445-
IsConnectionOpen::No => {
446-
log::info!(
447-
"Server informed us that it shut down normally -- will attempt to reconnect, in case it's restarting."
448-
);
449-
// TODO this should be handled by some global policy on whether to retry or shut
450-
// down after errors, rather than always retrying by resetting to NoConnection.
451-
return Ok(ConnectionState::NoConnection(NoConnection::new(
452-
config, hardware,
453-
)?));
454-
}
455-
}
475+
let is_connection_open =
476+
EstablishedConnection::on_read_incoming_packet(&mut self, hardware, packet)?;
477+
self.handle_is_connection_open_c(config, hardware, is_connection_open)
456478
}
457479

458480
fn on_terminate(self, config: &Config, hardware: &mut impl Hardware) -> Result<()> {
@@ -465,8 +487,8 @@ impl ConnectionStateTrait for EstablishedConnection {
465487

466488
#[derive(Debug, PartialEq, Eq, Clone)]
467489
pub(crate) struct Config {
468-
pub(crate) c2s_wire_config: WireConfig,
469-
pub(crate) s2c_wire_config: WireConfig,
490+
pub(crate) client_wire_config: WireConfig,
491+
pub(crate) server_wire_config: WireConfig,
470492
pub(crate) peer_address: SocketAddr,
471493
pub(crate) pre_shared_key: Vec<u8>,
472494
}

src/core/established_connection.rs

Lines changed: 67 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,31 @@ use anyhow::Result;
1616
pub(crate) struct EstablishedConnection {
1717
session: dtls::EstablishedSession,
1818
peer: std::net::SocketAddr,
19-
outgoing_wire_config: WireConfig,
19+
wire_config: WireConfig,
20+
last_incoming_packet_timestamp: u64,
2021
jitterator: Jitterator,
2122
outgoing_connection: OutgoingConnection,
2223
partial_outgoing_packet: messages::WriteCursor<IpPacketBuffer>,
2324
defragger: Defragger,
2425
}
2526

27+
// OutgoingConnection is sort of historical cruft, there was once a grand plan for a "scheduled"
28+
// mode where outbound packets would be read off the TUN at predetermined times also, in order to
29+
// hide even from userspace applications the details of the tunnel, and there would be both
30+
// scheduled and unscheduled OutgoingConnection implementations.
2631
/// The OutgoingConnection is responsible for reading outgoing packets.
2732
#[derive(Debug)]
28-
enum OutgoingConnection {
29-
Unscheduled {
30-
queued_packet: Box<Option<QueuedIpPacket>>,
31-
fragmentation_id: u16,
32-
},
33+
struct OutgoingConnection {
34+
queued_packet: Box<Option<QueuedIpPacket>>,
35+
fragmentation_id: u16,
3336
}
3437

3538
impl OutgoingConnection {
3639
fn new(hardware: &mut impl Hardware) -> Self {
3740
// unless we have a queued packet (which we don't yet), we want the hardware to always know
3841
// we are ready to read an outgoing packet.
3942
hardware.read_outgoing_packet();
40-
Self::Unscheduled {
43+
Self {
4144
queued_packet: Box::new(None),
4245
fragmentation_id: 0,
4346
}
@@ -50,33 +53,26 @@ impl OutgoingConnection {
5053
hardware: &mut impl Hardware,
5154
write_cursor: &mut messages::WriteCursor<IpPacketBuffer>,
5255
) {
53-
match self {
54-
OutgoingConnection::Unscheduled {
55-
queued_packet,
56-
fragmentation_id,
57-
} => {
58-
// this indirection is me being afraid that we're going to accidentally reallocate
59-
// the box if we try to `take` the `queued_packet` directly (it might try to `take`
60-
// the `Box` and then `expect` will still work due to deref coercion?).
61-
let queued_packet_mut: &mut Option<QueuedIpPacket> = queued_packet;
62-
if let Some(old_queued_packet) = std::mem::take(queued_packet_mut) {
63-
let bytes_left = write_cursor.num_bytes_left();
64-
*fragmentation_id = fragmentation_id.wrapping_add(1);
65-
let fragment = old_queued_packet.fragment(bytes_left, *fragmentation_id);
66-
match fragment {
67-
FragmentResult::Done(msg) => {
68-
msg.serialize(write_cursor);
69-
// queue is empty, let's ask for another packet!
70-
hardware.read_outgoing_packet();
71-
}
72-
FragmentResult::Partial(msg, new_queued_packet) => {
73-
msg.serialize(write_cursor);
74-
*queued_packet_mut = Some(new_queued_packet);
75-
}
76-
FragmentResult::MaxLengthTooShort(new_queued_packet) => {
77-
*queued_packet_mut = Some(new_queued_packet);
78-
}
79-
}
56+
// this indirection is me being afraid that we're going to accidentally reallocate
57+
// the box if we try to `take` the `queued_packet` directly (it might try to `take`
58+
// the `Box` and then `expect` will still work due to deref coercion?).
59+
let queued_packet_mut: &mut Option<QueuedIpPacket> = &mut self.queued_packet;
60+
if let Some(old_queued_packet) = std::mem::take(queued_packet_mut) {
61+
let bytes_left = write_cursor.num_bytes_left();
62+
self.fragmentation_id = self.fragmentation_id.wrapping_add(1);
63+
let fragment = old_queued_packet.fragment(bytes_left, self.fragmentation_id);
64+
match fragment {
65+
FragmentResult::Done(msg) => {
66+
msg.serialize(write_cursor);
67+
// queue is empty, let's ask for another packet!
68+
hardware.read_outgoing_packet();
69+
}
70+
FragmentResult::Partial(msg, new_queued_packet) => {
71+
msg.serialize(write_cursor);
72+
*queued_packet_mut = Some(new_queued_packet);
73+
}
74+
FragmentResult::MaxLengthTooShort(new_queued_packet) => {
75+
*queued_packet_mut = Some(new_queued_packet);
8076
}
8177
}
8278
}
@@ -89,20 +85,13 @@ impl OutgoingConnection {
8985
packet: &[u8],
9086
_recv_timestamp: u64,
9187
) {
92-
match self {
93-
OutgoingConnection::Unscheduled {
94-
queued_packet,
95-
fragmentation_id: _,
96-
} => {
97-
// strategy: queue the packet, then dequeue it!
98-
assert!(
99-
queued_packet.is_none(),
100-
"We never request to read an outgoing packet while we have a queued packet"
101-
);
102-
**queued_packet = Some(QueuedIpPacket::new(packet));
103-
self.try_to_dequeue(hardware, write_cursor);
104-
}
105-
}
88+
// strategy: queue the packet, then dequeue it!
89+
assert!(
90+
self.queued_packet.is_none(),
91+
"We never request to read an outgoing packet while we have a queued packet"
92+
);
93+
*self.queued_packet = Some(QueuedIpPacket::new(packet));
94+
self.try_to_dequeue(hardware, write_cursor);
10695
}
10796
}
10897

@@ -128,7 +117,14 @@ impl EstablishedConnection {
128117
ip_to_i405_length(outgoing_wire_config.packet_length, peer).into(),
129118
)),
130119
defragger: Defragger::new(),
131-
outgoing_wire_config,
120+
wire_config: outgoing_wire_config,
121+
// this is a tiny bit jank in the client case, because the server won't start sending us
122+
// packets until it receives our first post-handshake packet. If we have fast incoming
123+
// intervals but long roundtrip time, it's possible that quite a few incoming intervals
124+
// will elapse before we start receiving anything from the server. We can't just set
125+
// this to None though and wait for the first server packet, because the server could
126+
// theoretically crash even now!
127+
last_incoming_packet_timestamp: hardware.timestamp(),
132128
jitterator,
133129
})
134130
}
@@ -141,14 +137,14 @@ impl EstablishedConnection {
141137
&mut self,
142138
hardware: &mut impl Hardware,
143139
timer_timestamp: u64,
144-
) -> Result<()> {
140+
) -> Result<IsConnectionOpen> {
145141
// timer means that it's about time to send a packet -- let's finalize the packet and send
146142
// it to the hardware!
147-
let send_timestamp = timer_timestamp + self.outgoing_wire_config.packet_finalize_delta;
143+
let send_timestamp = timer_timestamp + self.wire_config.packet_finalize_delta;
148144
let outgoing_cleartext_packet = std::mem::replace(
149145
&mut self.partial_outgoing_packet,
150146
messages::WriteCursor::new(IpPacketBuffer::new_empty(
151-
ip_to_i405_length(self.outgoing_wire_config.packet_length, self.peer).into(),
147+
ip_to_i405_length(self.wire_config.packet_length, self.peer).into(),
152148
)),
153149
)
154150
.into_inner();
@@ -165,10 +161,14 @@ impl EstablishedConnection {
165161
hardware.register_interval(next_interval);
166162
self.outgoing_connection
167163
.try_to_dequeue(hardware, &mut self.partial_outgoing_packet);
168-
hardware.set_timer(
169-
send_timestamp + next_interval - self.outgoing_wire_config.packet_finalize_delta,
170-
);
171-
Ok(())
164+
hardware.set_timer(send_timestamp + next_interval - self.wire_config.packet_finalize_delta);
165+
166+
// check if the incoming connection timed out
167+
if hardware.timestamp() > self.last_incoming_packet_timestamp + self.wire_config.timeout {
168+
return Ok(IsConnectionOpen::TimedOut);
169+
}
170+
171+
Ok(IsConnectionOpen::Yes)
172172
}
173173

174174
pub(crate) fn on_read_outgoing_packet<H: Hardware>(
@@ -194,15 +194,20 @@ impl EstablishedConnection {
194194
) -> Result<IsConnectionOpen> {
195195
match self.session.decrypt_datagram(packet) {
196196
dtls::DecryptResult::Decrypted(cleartext_packet) => {
197-
self.on_read_incoming_cleartext_packet(hardware, &cleartext_packet)
197+
// notice how we only update the last_incoming_packet_timestamp on a successful
198+
// decryption. Otherwise, eg if the client restarts unexpectedly, it might keep
199+
// attempting a handshake so we won't time out.
200+
self.last_incoming_packet_timestamp = hardware.timestamp();
201+
self.on_read_incoming_cleartext_packet(hardware, &cleartext_packet)?;
202+
Ok(IsConnectionOpen::Yes)
198203
}
199204
dtls::DecryptResult::SendThese(send_these) => {
200205
for packet in send_these {
201206
hardware.send_outgoing_packet(&packet, self.peer, None)?;
202207
}
203208
Ok(IsConnectionOpen::Yes)
204209
}
205-
dtls::DecryptResult::Terminated => Ok(IsConnectionOpen::No),
210+
dtls::DecryptResult::Terminated => Ok(IsConnectionOpen::TerminatedNormally),
206211
dtls::DecryptResult::Err(err) => Err(err.into()),
207212
}
208213
}
@@ -211,7 +216,7 @@ impl EstablishedConnection {
211216
&mut self,
212217
hardware: &mut H,
213218
packet: &[u8],
214-
) -> Result<IsConnectionOpen> {
219+
) -> Result<()> {
215220
let mut cursor = messages::ReadCursor::new(packet);
216221
while messages::has_message(&cursor) {
217222
let msg = cursor.read()?;
@@ -237,7 +242,7 @@ impl EstablishedConnection {
237242
}
238243
}
239244
}
240-
Ok(IsConnectionOpen::Yes)
245+
Ok(())
241246
}
242247

243248
// When I name this just `on_terminate`, there's a conflict with the name of the same method
@@ -251,5 +256,6 @@ impl EstablishedConnection {
251256
#[must_use]
252257
pub(crate) enum IsConnectionOpen {
253258
Yes,
254-
No,
259+
TimedOut,
260+
TerminatedNormally,
255261
}

0 commit comments

Comments
 (0)