Skip to content

Commit 96e3e99

Browse files
committed
Refine ping ranking logic
1 parent 45e8bc6 commit 96e3e99

File tree

4 files changed

+96
-61
lines changed

4 files changed

+96
-61
lines changed

src/relay/loadbalancing/server/ping.rs

Lines changed: 60 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use std::{
22
collections::VecDeque,
3-
fmt, io,
3+
fmt,
4+
io,
45
sync::{
56
atomic::{AtomicU64, AtomicUsize, Ordering},
6-
Arc, Mutex,
7+
Arc,
8+
Mutex,
79
},
810
};
911

@@ -40,8 +42,14 @@ impl Server {
4042

4143
const MAX_LATENCY_QUEUE_SIZE: usize = 37;
4244

45+
#[derive(Debug, Copy, Clone)]
46+
enum Score {
47+
Latency(u64),
48+
Errored,
49+
}
50+
4351
struct ServerLatencyInner {
44-
latency_queue: VecDeque<u64>,
52+
latency_queue: VecDeque<Score>,
4553
}
4654

4755
impl ServerLatencyInner {
@@ -51,7 +59,7 @@ impl ServerLatencyInner {
5159
}
5260
}
5361

54-
fn push(&mut self, lat: u64) -> u64 {
62+
fn push(&mut self, lat: Score) -> u64 {
5563
self.latency_queue.push_back(lat);
5664
if self.latency_queue.len() > MAX_LATENCY_QUEUE_SIZE {
5765
self.latency_queue.pop_front();
@@ -62,18 +70,45 @@ impl ServerLatencyInner {
6270

6371
fn score(&self) -> u64 {
6472
if self.latency_queue.is_empty() {
65-
return u64::max_value();
73+
// Never checked, assume it is the worst of all
74+
return 2 * 1000;
75+
}
76+
77+
// 1. Mid Latency
78+
// 2. Proportion of Errors
79+
let mut vec_lat = Vec::with_capacity(self.latency_queue.len());
80+
let mut acc_err = 0;
81+
for lat in &self.latency_queue {
82+
match lat {
83+
Score::Latency(l) => vec_lat.push(l),
84+
Score::Errored => acc_err += 1,
85+
}
6686
}
6787

68-
let mut v = self.latency_queue.iter().cloned().collect::<Vec<u64>>();
69-
v.sort();
88+
let max_lat = DEFAULT_CHECK_TIMEOUT_SEC * 1000;
7089

71-
let mid = v.len() / 2;
72-
if (v.len() & 1) == 0 {
73-
(v[mid - 1] + v[mid]) / 2
90+
// Find the mid of latencies
91+
let mid_lat = if vec_lat.is_empty() {
92+
// The whole array are errors
93+
max_lat
7494
} else {
75-
v[mid]
76-
}
95+
vec_lat.sort();
96+
let mid = vec_lat.len() / 2;
97+
if vec_lat.len() % 2 == 0 {
98+
(vec_lat[mid] + vec_lat[mid - 1]) / 2
99+
} else {
100+
*vec_lat[mid]
101+
}
102+
};
103+
104+
// Score = norm_lat + prop_err
105+
//
106+
// 1. The lower latency, the better
107+
// 2. The lower errored count, the better
108+
let norm_lat = mid_lat as f64 / max_lat as f64;
109+
let prop_err = acc_err as f64 / self.latency_queue.len() as f64;
110+
111+
((norm_lat + prop_err) * 1000.0) as u64
77112
}
78113
}
79114

@@ -89,7 +124,7 @@ impl ServerLatency {
89124
}
90125
}
91126

92-
fn push(&self, lat: u64) -> u64 {
127+
fn push(&self, lat: Score) -> u64 {
93128
let mut inner = self.inner.lock().unwrap();
94129
inner.push(lat)
95130
}
@@ -135,20 +170,22 @@ impl Inner {
135170
tokio::spawn(
136171
// Check every DEFAULT_CHECK_INTERVAL_SEC seconds
137172
async move {
138-
// Wait until the server is ready (plugins are started)
139-
time::delay_for(Duration::from_secs(DEFAULT_CHECK_INTERVAL_SEC)).await;
140-
141173
let mut interval = time::interval(Duration::from_secs(DEFAULT_CHECK_INTERVAL_SEC));
142174

143175
while context.server_running() {
144-
interval.tick().await;
176+
// First round may be failed, plugins are started asynchronously
177+
145178
let score = match Inner::check_delay(&*sc, &*context).await {
146-
Ok(d) => latency.push(d),
147-
Err(..) => latency.push(DEFAULT_CHECK_TIMEOUT_SEC * 2 * 1000), // Penalty
179+
Ok(d) => latency.push(Score::Latency(d)),
180+
Err(..) => latency.push(Score::Errored), // Penalty
148181
};
149182
debug!("updated remote server {} (score: {})", sc.config.addr(), score);
150183
sc.score.store(score, Ordering::Release);
184+
185+
interval.tick().await;
151186
}
187+
188+
debug!("server {} latency ping task stopped", sc.config.addr());
152189
},
153190
);
154191
}
@@ -236,12 +273,10 @@ impl PingBalancer {
236273
tokio::spawn(async move {
237274
let inner = cloned_inner;
238275

239-
// Wait until the server is ready (plugins are started)
240-
time::delay_for(Duration::from_secs(2)).await;
241-
242276
let mut interval = time::interval(Duration::from_secs(DEFAULT_CHECK_INTERVAL_SEC));
243277

244278
while context.server_running() {
279+
// Must be here, wait for the first checking round
245280
interval.tick().await;
246281

247282
assert!(!inner.servers.is_empty());
@@ -257,23 +292,17 @@ impl PingBalancer {
257292
}
258293

259294
let choosen_svr = &inner.servers[svr_idx];
260-
debug!(
261-
"chosen the best server {} (score: {})",
262-
choosen_svr.config.addr(),
263-
choosen_svr.score()
264-
);
265-
266-
if inner.best_idx() != svr_idx {
295+
if choosen_svr.score() != inner.servers[inner.best_idx()].score() {
267296
info!(
268297
"switched server from {} (score: {}) to {} (score: {})",
269298
inner.best_server().config.addr(),
270299
inner.best_server().score(),
271300
choosen_svr.config.addr(),
272301
choosen_svr.score(),
273302
);
274-
}
275303

276-
inner.set_best_idx(svr_idx);
304+
inner.set_best_idx(svr_idx);
305+
}
277306
}
278307
});
279308
}

src/relay/udprelay/server.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
use std::{
44
io::{self, Cursor},
55
net::{IpAddr, Ipv4Addr, SocketAddr},
6-
sync::atomic::{AtomicBool, Ordering},
7-
sync::Arc,
6+
sync::{
7+
atomic::{AtomicBool, Ordering},
8+
Arc,
9+
},
810
time::Duration,
911
};
1012

@@ -57,7 +59,7 @@ impl UdpAssociation {
5759
src_addr: SocketAddr,
5860
mut response_tx: mpsc::Sender<(SocketAddr, BytesMut)>,
5961
) -> io::Result<UdpAssociation> {
60-
debug!("Created UDP Association for {}", src_addr);
62+
debug!("created UDP Association for {}", src_addr);
6163

6264
// Create a socket for receiving packets
6365
let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
@@ -82,7 +84,7 @@ impl UdpAssociation {
8284
if let Err(err) =
8385
UdpAssociation::relay_l2r(&*context, src_addr, &mut sender, &pkt[..], timeout, &*c_svr_cfg).await
8486
{
85-
error!("Failed to relay packet, {} -> ..., error: {}", src_addr, err);
87+
error!("failed to relay packet, {} -> ..., error: {}", src_addr, err);
8688

8789
// FIXME: Ignore? Or how to deal with it?
8890
}
@@ -101,10 +103,10 @@ impl UdpAssociation {
101103
match UdpAssociation::relay_r2l(src_addr, &mut receiver, timeout, &mut response_tx, &*svr_cfg).await {
102104
Ok(..) => {}
103105
Err(ref err) if err.kind() == ErrorKind::TimedOut => {
104-
trace!("Receive packet timeout, {} <- ...", src_addr);
106+
trace!("receive packet timeout, {} <- ...", src_addr);
105107
}
106108
Err(err) => {
107-
error!("Failed to receive packet, {} <- .., error: {}", src_addr, err);
109+
error!("railed to receive packet, {} <- .., error: {}", src_addr, err);
108110

109111
// FIXME: Don't break, or if you can find a way to drop the UdpAssociation
110112
// break;
@@ -132,12 +134,12 @@ impl UdpAssociation {
132134
let decrypted_pkt = match decrypt_payload(svr_cfg.method(), svr_cfg.key(), pkt) {
133135
Ok(Some(pkt)) => pkt,
134136
Ok(None) => {
135-
error!("Failed to decrypt pkt in UDP relay, packet too short");
137+
error!("failed to decrypt pkt in UDP relay, packet too short");
136138
let err = io::Error::new(io::ErrorKind::InvalidData, "packet too short");
137139
return Err(err);
138140
}
139141
Err(err) => {
140-
error!("Failed to decrypt pkt in UDP relay: {}", err);
142+
error!("failed to decrypt pkt in UDP relay: {}", err);
141143
let err = io::Error::new(io::ErrorKind::InvalidData, "decrypt failed");
142144
return Err(err);
143145
}
@@ -232,7 +234,7 @@ impl UdpAssociation {
232234

233235
// Send back to src_addr
234236
if let Err(err) = response_tx.send((src_addr, encrypt_buf)).await {
235-
error!("Failed to send packet into response channel, error: {}", err);
237+
error!("failed to send packet into response channel, error: {}", err);
236238

237239
// FIXME: What to do? Ignore?
238240
}
@@ -244,7 +246,7 @@ impl UdpAssociation {
244246
match self.tx.send(pkt.to_vec()).await {
245247
Ok(..) => true,
246248
Err(err) => {
247-
error!("Failed to send packet, error: {}", err);
249+
error!("failed to send packet, error: {}", err);
248250
false
249251
}
250252
}
@@ -290,7 +292,7 @@ async fn listen(context: SharedContext, svr_cfg: Arc<ServerConfig>) -> io::Resul
290292
// Packet length is limited by MAXIMUM_UDP_PAYLOAD_SIZE, excess bytes will be discarded.
291293
let pkt = &pkt_buf[..recv_len];
292294

293-
trace!("Received UDP packet from {}, length {} bytes", src, recv_len);
295+
trace!("received UDP packet from {}, length {} bytes", src, recv_len);
294296

295297
// Check or (re)create an association
296298
loop {
@@ -329,7 +331,7 @@ pub async fn run(context: SharedContext) -> io::Result<()> {
329331

330332
match vec_fut.into_future().await.0 {
331333
Some(res) => {
332-
error!("One of TCP servers exited unexpectly, result: {:?}", res);
334+
error!("one of TCP servers exited unexpectly, result: {:?}", res);
333335
let err = io::Error::new(io::ErrorKind::Other, "server exited unexpectly");
334336
Err(err)
335337
}

src/relay/udprelay/socks5_local.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
use std::{
44
io::{self, Cursor, ErrorKind, Read},
55
net::{IpAddr, Ipv4Addr, SocketAddr},
6-
sync::atomic::{AtomicBool, Ordering},
7-
sync::Arc,
6+
sync::{
7+
atomic::{AtomicBool, Ordering},
8+
Arc,
9+
},
810
time::Duration,
911
};
1012

@@ -45,7 +47,7 @@ async fn parse_packet(pkt: &[u8]) -> io::Result<(Address, Vec<u8>)> {
4547
let header = UdpAssociateHeader::read_from(&mut cur).await?;
4648

4749
if header.frag != 0 {
48-
error!("Received UDP associate with frag != 0, which is not supported by ShadowSocks");
50+
error!("received UDP associate with frag != 0, which is not supported by ShadowSocks");
4951
let err = io::Error::new(ErrorKind::Other, "unsupported UDP fragmentation");
5052
return Err(err);
5153
}
@@ -81,7 +83,7 @@ impl UdpAssociation {
8183
src_addr: SocketAddr,
8284
mut response_tx: mpsc::Sender<(SocketAddr, Vec<u8>)>,
8385
) -> io::Result<UdpAssociation> {
84-
debug!("Created UDP Association for {}", src_addr);
86+
debug!("created UDP Association for {}", src_addr);
8587

8688
// Create a socket for receiving packets
8789
let local_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
@@ -106,7 +108,7 @@ impl UdpAssociation {
106108
if let Err(err) =
107109
UdpAssociation::relay_l2r(&*context, src_addr, &mut sender, &pkt[..], timeout, &*c_svr_cfg).await
108110
{
109-
error!("Failed to send packet {} -> ..., error: {}", src_addr, err);
111+
error!("failed to send packet {} -> ..., error: {}", src_addr, err);
110112

111113
// FIXME: Ignore? Or how to deal with it?
112114
}
@@ -123,10 +125,10 @@ impl UdpAssociation {
123125
match UdpAssociation::relay_r2l(src_addr, &mut receiver, timeout, &mut response_tx, &*svr_cfg).await {
124126
Ok(..) => {}
125127
Err(ref err) if err.kind() == ErrorKind::TimedOut => {
126-
trace!("Receive packet timeout, {} <- ...", src_addr);
128+
trace!("receive packet timeout, {} <- ...", src_addr);
127129
}
128130
Err(err) => {
129-
error!("Failed to receive packet, {} <- .., error: {}", src_addr, err);
131+
error!("failed to receive packet, {} <- .., error: {}", src_addr, err);
130132

131133
// FIXME: Don't break, or if you can find a way to drop the UdpAssociation
132134
// break;
@@ -238,7 +240,7 @@ impl UdpAssociation {
238240

239241
// Send back to src_addr
240242
if let Err(err) = response_tx.send((src_addr, payload)).await {
241-
error!("Failed to send packet into response channel, error: {}", err);
243+
error!("failed to send packet into response channel, error: {}", err);
242244

243245
// FIXME: What to do? Ignore?
244246
}
@@ -250,7 +252,7 @@ impl UdpAssociation {
250252
match self.tx.send(pkt.to_vec()).await {
251253
Ok(..) => true,
252254
Err(err) => {
253-
error!("Failed to send packet, error: {}", err);
255+
error!("failed to send packet, error: {}", err);
254256
false
255257
}
256258
}
@@ -295,7 +297,7 @@ async fn listen(context: SharedContext, l: UdpSocket) -> io::Result<()> {
295297
// Copy bytes, because udp_associate runs in another tokio Task
296298
let pkt = &pkt_buf[..recv_len];
297299

298-
trace!("Received UDP packet from {}, length {} bytes", src, recv_len);
300+
trace!("received UDP packet from {}, length {} bytes", src, recv_len);
299301

300302
// Pick a server
301303
let svr_cfg = balancer.pick_server();

0 commit comments

Comments
 (0)