Skip to content

Commit caafcbc

Browse files
committed
make packet queue unbound, wake loop thread when buffer is full/empty
1 parent 95674f2 commit caafcbc

File tree

4 files changed

+23
-23
lines changed

4 files changed

+23
-23
lines changed

crates/shadowsocks-service/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ once_cell = "1.8"
8989
thiserror = "1.0"
9090
arc-swap = "1.3"
9191

92-
spin = { version = "0.9", features = ["std"] }
92+
spin = { version = "0.9" }
9393
lru_time_cache = "0.11"
9494
bytes = "1.0"
9595
byte_string = "1.0"

crates/shadowsocks-service/src/local/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ pub async fn create(config: Config) -> io::Result<Server> {
125125
accept_opts.tcp.nodelay = config.no_delay;
126126
accept_opts.tcp.fastopen = config.fast_open;
127127
accept_opts.tcp.keepalive = config.keep_alive.or(Some(LOCAL_DEFAULT_KEEPALIVE_TIMEOUT));
128+
context.set_accept_opts(accept_opts);
128129

129130
if let Some(resolver) = build_dns_resolver(config.dns, config.ipv6_first, context.connect_opts_ref()).await {
130131
context.set_dns_resolver(Arc::new(resolver));

crates/shadowsocks-service/src/local/tun/tcp.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,9 @@ use crate::local::{
3838

3939
use super::virt_device::VirtTunDevice;
4040

41-
// NOTE: Default value is taken from Linux
42-
// recv: /proc/sys/net/ipv4/tcp_rmem 87380 bytes
43-
// send: /proc/sys/net/ipv4/tcp_wmem 16384 bytes
44-
const DEFAULT_TCP_SEND_BUFFER_SIZE: u32 = 16384;
45-
const DEFAULT_TCP_RECV_BUFFER_SIZE: u32 = 87380;
41+
// NOTE: Default buffer could contain 20 AEAD packets
42+
const DEFAULT_TCP_SEND_BUFFER_SIZE: u32 = 0x3FFF * 20;
43+
const DEFAULT_TCP_RECV_BUFFER_SIZE: u32 = 0x3FFF * 20;
4644

4745
struct TcpSocketControl {
4846
send_buffer: RingBuffer<'static, u8>,
@@ -147,7 +145,9 @@ impl AsyncRead for TcpConnection {
147145
let n = control.recv_buffer.dequeue_slice(recv_buf);
148146
buf.advance(n);
149147

150-
self.manager_notify.notify();
148+
if control.recv_buffer.is_empty() {
149+
self.manager_notify.notify();
150+
}
151151
Ok(()).into()
152152
}
153153
}
@@ -173,7 +173,9 @@ impl AsyncWrite for TcpConnection {
173173

174174
let n = control.send_buffer.enqueue_slice(buf);
175175

176-
self.manager_notify.notify();
176+
if control.send_buffer.is_full() {
177+
self.manager_notify.notify();
178+
}
177179
Ok(n).into()
178180
}
179181

@@ -207,7 +209,7 @@ pub struct TcpTun {
207209
manager_running: Arc<AtomicBool>,
208210
balancer: PingBalancer,
209211
iface_rx: mpsc::UnboundedReceiver<Vec<u8>>,
210-
iface_tx: mpsc::Sender<Vec<u8>>,
212+
iface_tx: mpsc::UnboundedSender<Vec<u8>>,
211213
}
212214

213215
impl Drop for TcpTun {
@@ -278,10 +280,8 @@ impl TcpTun {
278280
}
279281
};
280282

281-
let after_poll = SmolInstant::now();
282-
283283
if updated_sockets {
284-
trace!("VirtDevice::poll costed {}", after_poll - before_poll);
284+
trace!("VirtDevice::poll costed {}", SmolInstant::now() - before_poll);
285285
}
286286

287287
// Check all the sockets' status
@@ -377,11 +377,8 @@ impl TcpTun {
377377
iface.remove_socket(socket_handle);
378378
}
379379

380-
let next_duration = iface
381-
.poll_delay(SmolInstant::now())
382-
.unwrap_or(SmolDuration::from_millis(1));
383-
384-
if next_duration.total_millis() != 0 {
380+
let next_duration = iface.poll_delay(before_poll).unwrap_or(SmolDuration::from_millis(5));
381+
if next_duration != SmolDuration::ZERO {
385382
thread::park_timeout(Duration::from(next_duration));
386383
}
387384
}
@@ -422,10 +419,10 @@ impl TcpTun {
422419
TcpSocketBuffer::new(vec![0u8; send_buffer_size as usize]),
423420
);
424421
socket.set_keep_alive(accept_opts.tcp.keepalive.map(From::from));
425-
// FIXME: This should follows system's setting. 7200 is Linux's default.
422+
// FIXME: It should follow system's setting. 7200 is Linux's default.
426423
socket.set_timeout(Some(SmolDuration::from_secs(7200)));
427424
// NO ACK delay
428-
socket.set_ack_delay(None);
425+
// socket.set_ack_delay(None);
429426

430427
if let Err(err) = socket.listen(dst_addr) {
431428
return Err(io::Error::new(ErrorKind::Other, err));
@@ -454,7 +451,7 @@ impl TcpTun {
454451
}
455452

456453
pub async fn drive_interface_state(&mut self, frame: &[u8]) {
457-
if let Err(..) = self.iface_tx.send(frame.to_vec()).await {
454+
if let Err(..) = self.iface_tx.send(frame.to_vec()) {
458455
panic!("interface send channel closed unexpectly");
459456
}
460457

crates/shadowsocks-service/src/local/tun/virt_device.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ use tokio::sync::mpsc;
88

99
pub struct VirtTunDevice {
1010
capabilities: DeviceCapabilities,
11-
in_buf: mpsc::Receiver<Vec<u8>>,
11+
in_buf: mpsc::UnboundedReceiver<Vec<u8>>,
1212
out_buf: mpsc::UnboundedSender<Vec<u8>>,
1313
}
1414

1515
impl VirtTunDevice {
16-
pub fn new(capabilities: DeviceCapabilities) -> (Self, mpsc::UnboundedReceiver<Vec<u8>>, mpsc::Sender<Vec<u8>>) {
16+
pub fn new(
17+
capabilities: DeviceCapabilities,
18+
) -> (Self, mpsc::UnboundedReceiver<Vec<u8>>, mpsc::UnboundedSender<Vec<u8>>) {
1719
let (iface_tx, iface_output) = mpsc::unbounded_channel();
18-
let (iface_input, iface_rx) = mpsc::channel(512);
20+
let (iface_input, iface_rx) = mpsc::unbounded_channel();
1921

2022
(
2123
Self {

0 commit comments

Comments
 (0)