Skip to content

Commit 3bac0af

Browse files
authored
Create a flow control for tcp (#265)
This branch adds a FlowControl mechanism that side channels the queues. The control handling process essentially applies back pressure by tracking the in flight requests end to end and enforcing the tcp_capacity on the system.
1 parent f992c9f commit 3bac0af

File tree

5 files changed

+454
-56
lines changed

5 files changed

+454
-56
lines changed

src/host.rs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::envelope::{hex, Datagram, Protocol, Segment, Syn};
22
#[cfg(feature = "unstable-fs")]
33
use crate::fs::{Fs, FsConfig};
4+
use crate::net::tcp::stream::BidiFlowControl;
45
use crate::net::{SocketPair, TcpListener, UdpSocket};
56
use crate::{Envelope, TRACING_TARGET};
67

@@ -11,10 +12,9 @@ use std::fmt::Display;
1112
use std::io;
1213
use std::net::{IpAddr, SocketAddr};
1314
use std::ops::RangeInclusive;
14-
#[cfg(not(feature = "unstable-fs"))]
1515
use std::sync::Arc;
1616
#[cfg(feature = "unstable-fs")]
17-
use std::sync::{Arc, Mutex};
17+
use std::sync::Mutex;
1818
use tokio::sync::{mpsc, Notify};
1919
use tokio::time::{Duration, Instant};
2020

@@ -346,11 +346,11 @@ struct ServerSocket {
346346
}
347347

348348
struct StreamSocket {
349-
local_addr: SocketAddr,
350349
buf: IndexMap<u64, SequencedSegment>,
351350
next_send_seq: u64,
352351
recv_seq: u64,
353352
sender: mpsc::Sender<SequencedSegment>,
353+
flow_control: BidiFlowControl,
354354
/// A simple reference counter for tracking read/write half drops. Once 0, the
355355
/// socket may be removed from the host.
356356
ref_ct: usize,
@@ -374,18 +374,19 @@ impl Display for SequencedSegment {
374374
}
375375

376376
impl StreamSocket {
377-
fn new(local_addr: SocketAddr, capacity: usize) -> (Self, mpsc::Receiver<SequencedSegment>) {
377+
fn new(capacity: usize) -> (Self, mpsc::Receiver<SequencedSegment>, BidiFlowControl) {
378378
let (tx, rx) = mpsc::channel(capacity);
379+
let flow_control = BidiFlowControl::new(capacity);
379380
let sock = Self {
380-
local_addr,
381381
buf: IndexMap::new(),
382382
next_send_seq: 1,
383383
recv_seq: 0,
384384
sender: tx,
385+
flow_control: flow_control.clone(),
385386
ref_ct: 2,
386387
};
387388

388-
(sock, rx)
389+
(sock, rx, flow_control)
389390
}
390391

391392
fn assign_seq(&mut self) -> u64 {
@@ -406,11 +407,17 @@ impl StreamSocket {
406407
while self.buf.contains_key(&(self.recv_seq + 1)) {
407408
self.recv_seq += 1;
408409

409-
let segment = self.buf.swap_remove(&self.recv_seq).unwrap();
410-
self.sender.try_send(segment).map_err(|e| match e {
411-
Closed(_) => Protocol::Tcp(Segment::Rst),
412-
Full(_) => panic!("{} socket buffer full", self.local_addr),
413-
})?;
410+
match self.sender.try_reserve() {
411+
Ok(permit) => {
412+
let segment = self.buf.swap_remove(&self.recv_seq).unwrap();
413+
permit.send(segment)
414+
}
415+
Err(Closed(())) => return Err(Protocol::Tcp(Segment::Rst)),
416+
Err(Full(())) => {
417+
self.recv_seq -= 1;
418+
break;
419+
}
420+
}
414421
}
415422

416423
Ok(())
@@ -451,14 +458,25 @@ impl Tcp {
451458
Ok(TcpListener::new(addr, notify))
452459
}
453460

454-
pub(crate) fn new_stream(&mut self, pair: SocketPair) -> mpsc::Receiver<SequencedSegment> {
455-
let (sock, rx) = StreamSocket::new(pair.local, self.socket_capacity);
461+
pub(crate) fn new_stream(
462+
&mut self,
463+
pair: SocketPair,
464+
) -> (mpsc::Receiver<SequencedSegment>, BidiFlowControl) {
465+
let (sock, rx, bidi) = StreamSocket::new(self.socket_capacity);
456466

457467
let exists = self.sockets.insert(pair, sock);
458468

459469
assert!(exists.is_none(), "{pair:?} is already connected");
460470

461-
rx
471+
(rx, bidi)
472+
}
473+
474+
pub(crate) fn flow_control(&self, pair: SocketPair) -> BidiFlowControl {
475+
self.sockets
476+
.get(&pair)
477+
.expect("missing stream socket")
478+
.flow_control
479+
.clone()
462480
}
463481

464482
pub(crate) fn stream_count(&self) -> usize {

src/net/tcp/listener.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,12 @@ impl TcpListener {
7171
});
7272

7373
let Some((syn, origin)) = maybe_accept else {
74-
// Wait for a new incoming connection, then retry.
7574
self.notify.notified().await;
7675
continue;
7776
};
7877

7978
tracing::trace!(target: TRACING_TARGET, src = ?origin, dst = ?self.local_addr, protocol = %"TCP SYN", "Recv");
8079

81-
// Send SYN-ACK -> origin. If Ok we proceed (acts as the ACK), else
82-
// we retry.
8380
let ack = syn.ack.send(());
8481
tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, protocol = %"TCP SYN-ACK", "Send");
8582

@@ -89,19 +86,35 @@ impl TcpListener {
8986
};
9087

9188
let stream = World::current(|world| {
92-
let host = world.current_host_mut();
89+
let (pair, rx) = {
90+
let host = world.current_host_mut();
9391

94-
let mut my_addr = self.local_addr;
95-
if origin.ip().is_loopback() {
96-
my_addr.set_ip(origin.ip());
97-
}
98-
if my_addr.ip().is_unspecified() {
99-
my_addr.set_ip(host.addr);
100-
}
92+
let mut my_addr = self.local_addr;
93+
if origin.ip().is_loopback() {
94+
my_addr.set_ip(origin.ip());
95+
}
96+
if my_addr.ip().is_unspecified() {
97+
my_addr.set_ip(host.addr);
98+
}
99+
100+
let pair = SocketPair::new(my_addr, origin);
101+
let (rx, _) = host.tcp.new_stream(pair);
102+
(pair, rx)
103+
};
101104

102-
let pair = SocketPair::new(my_addr, origin);
103-
let rx = host.tcp.new_stream(pair);
104-
TcpStream::new(pair, rx)
105+
let client_ip = if origin.ip().is_loopback() {
106+
world.current.expect("current host missing")
107+
} else {
108+
origin.ip()
109+
};
110+
let client_pair = SocketPair::new(origin, pair.local);
111+
let bidi = world
112+
.hosts
113+
.get(&client_ip)
114+
.expect("client host missing")
115+
.tcp
116+
.flow_control(client_pair);
117+
TcpStream::new(pair, rx, bidi.invert())
105118
});
106119

107120
tracing::trace!(target: TRACING_TARGET, src = ?self.local_addr, dst = ?origin, "Accepted");

0 commit comments

Comments
 (0)