Skip to content

Commit 9ab1f8b

Browse files
committed
aborting tasks with JoinHandle instead of futures abortable
1 parent 250aa95 commit 9ab1f8b

File tree

9 files changed

+78
-109
lines changed

9 files changed

+78
-109
lines changed

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::{
44
fmt::{self, Debug, Display},
5-
future::Future,
65
io,
76
net::{Ipv4Addr, SocketAddr},
87
sync::{
@@ -13,7 +12,7 @@ use std::{
1312
};
1413

1514
use byte_string::ByteStr;
16-
use futures::future::{self, AbortHandle};
15+
use futures::future;
1716
use log::{debug, info, trace};
1817
use shadowsocks::{
1918
config::Mode,
@@ -26,6 +25,7 @@ use shadowsocks::{
2625
};
2726
use tokio::{
2827
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
28+
task::JoinHandle,
2929
time,
3030
};
3131

@@ -72,7 +72,7 @@ impl PingBalancerBuilder {
7272
self.servers.push(Arc::new(server));
7373
}
7474

75-
pub async fn build(self) -> (PingBalancer, impl Future<Output = ()>) {
75+
pub async fn build(self) -> PingBalancer {
7676
assert!(!self.servers.is_empty(), "build PingBalancer without any servers");
7777

7878
let balancer_context = PingBalancerContext {
@@ -87,21 +87,17 @@ impl PingBalancerBuilder {
8787

8888
let shared_context = Arc::new(balancer_context);
8989

90-
let (checker, abortable) = {
90+
let abortable = {
9191
let shared_context = shared_context.clone();
92-
future::abortable(async move { shared_context.checker_task().await })
93-
};
94-
let checker = async move {
95-
let _ = checker.await;
92+
tokio::spawn(async move { shared_context.checker_task().await })
9693
};
9794

98-
let balancer = PingBalancer {
95+
PingBalancer {
9996
inner: Arc::new(PingBalancerInner {
10097
context: shared_context,
10198
abortable,
10299
}),
103-
};
104-
(balancer, checker)
100+
}
105101
}
106102
}
107103

@@ -260,7 +256,7 @@ impl PingBalancerContext {
260256

261257
struct PingBalancerInner {
262258
context: Arc<PingBalancerContext>,
263-
abortable: AbortHandle,
259+
abortable: JoinHandle<()>,
264260
}
265261

266262
impl Drop for PingBalancerInner {

crates/shadowsocks-service/src/local/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,7 @@ pub async fn run(mut config: Config) -> io::Result<()> {
189189
for server in config.server {
190190
balancer_builder.add_server(ServerIdent::new(server));
191191
}
192-
let (balancer, checker) = balancer_builder.build().await;
193-
tokio::spawn(checker);
194-
195-
balancer
192+
balancer_builder.build().await
196193
};
197194

198195
#[cfg(feature = "local-flow-stat")]

crates/shadowsocks-service/src/local/net/udp/association.rs

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::{
99

1010
use async_trait::async_trait;
1111
use bytes::Bytes;
12-
use futures::future::{self, AbortHandle};
1312
use log::{debug, error, trace, warn};
1413
use lru_time_cache::LruCache;
1514
use shadowsocks::{
@@ -24,6 +23,7 @@ use spin::Mutex as SpinMutex;
2423
use tokio::{
2524
net::UdpSocket,
2625
sync::{mpsc, Mutex},
26+
task::JoinHandle,
2727
time,
2828
};
2929

@@ -53,8 +53,8 @@ where
5353
respond_writer: W,
5454
context: Arc<ServiceContext>,
5555
assoc_map: SharedAssociationMap<W>,
56-
cleanup_abortable: AbortHandle,
57-
keepalive_abortable: AbortHandle,
56+
cleanup_abortable: JoinHandle<()>,
57+
keepalive_abortable: JoinHandle<()>,
5858
keepalive_tx: mpsc::Sender<SocketAddr>,
5959
balancer: PingBalancer,
6060
}
@@ -89,29 +89,25 @@ where
8989

9090
let cleanup_abortable = {
9191
let assoc_map = assoc_map.clone();
92-
let (cleanup_task, cleanup_abortable) = future::abortable(async move {
92+
tokio::spawn(async move {
9393
loop {
9494
time::sleep(time_to_live).await;
9595

9696
// cleanup expired associations. iter() will remove expired elements
9797
let _ = assoc_map.lock().await.iter();
9898
}
99-
});
100-
tokio::spawn(cleanup_task);
101-
cleanup_abortable
99+
})
102100
};
103101

104-
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256);
102+
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(64);
105103

106104
let keepalive_abortable = {
107105
let assoc_map = assoc_map.clone();
108-
let (keepalive_task, keepalive_abortable) = future::abortable(async move {
106+
tokio::spawn(async move {
109107
while let Some(peer_addr) = keepalive_rx.recv().await {
110108
assoc_map.lock().await.get(&peer_addr);
111109
}
112-
});
113-
tokio::spawn(keepalive_task);
114-
keepalive_abortable
110+
})
115111
};
116112

117113
UdpAssociationManager {
@@ -199,7 +195,7 @@ enum UdpAssociationBypassState {
199195
Empty,
200196
Connected {
201197
socket: Arc<UdpSocket>,
202-
abortable: AbortHandle,
198+
abortable: JoinHandle<io::Result<()>>,
203199
},
204200
Aborted,
205201
}
@@ -217,7 +213,7 @@ impl UdpAssociationBypassState {
217213
UdpAssociationBypassState::Empty
218214
}
219215

220-
fn set_connected(&mut self, socket: Arc<UdpSocket>, abortable: AbortHandle) {
216+
fn set_connected(&mut self, socket: Arc<UdpSocket>, abortable: JoinHandle<io::Result<()>>) {
221217
*self = UdpAssociationBypassState::Connected { socket, abortable };
222218
}
223219

@@ -230,7 +226,7 @@ enum UdpAssociationProxyState {
230226
Empty,
231227
Connected {
232228
socket: Arc<MonProxySocket>,
233-
abortable: AbortHandle,
229+
abortable: JoinHandle<io::Result<()>>,
234230
},
235231
Aborted,
236232
}
@@ -251,7 +247,7 @@ impl UdpAssociationProxyState {
251247
*self = UdpAssociationProxyState::Empty;
252248
}
253249

254-
fn set_connected(&mut self, socket: Arc<MonProxySocket>, abortable: AbortHandle) {
250+
fn set_connected(&mut self, socket: Arc<MonProxySocket>, abortable: JoinHandle<io::Result<()>>) {
255251
self.abort_inner();
256252
*self = UdpAssociationProxyState::Connected { socket, abortable };
257253
}
@@ -394,13 +390,12 @@ where
394390
ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?;
395391
let socket: Arc<UdpSocket> = Arc::new(socket.into());
396392

397-
let (r2l_fut, r2l_abortable) = {
393+
// CLIENT <- REMOTE
394+
let r2l_abortable = {
398395
let assoc = self.clone();
399-
future::abortable(assoc.copy_bypassed_r2l(socket.clone()))
396+
tokio::spawn(assoc.copy_bypassed_r2l(socket.clone()))
400397
};
401398

402-
// CLIENT <- REMOTE
403-
tokio::spawn(r2l_fut);
404399
debug!(
405400
"created udp association for {} (bypassed) with {:?}",
406401
self.peer_addr,
@@ -449,13 +444,12 @@ where
449444
ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?;
450445
let socket: Arc<UdpSocket> = Arc::new(socket.into());
451446

452-
let (r2l_fut, r2l_abortable) = {
447+
// CLIENT <- REMOTE
448+
let r2l_abortable = {
453449
let assoc = self.clone();
454-
future::abortable(assoc.copy_bypassed_r2l(socket.clone()))
450+
tokio::spawn(assoc.copy_bypassed_r2l(socket.clone()))
455451
};
456452

457-
// CLIENT <- REMOTE
458-
tokio::spawn(r2l_fut);
459453
debug!(
460454
"created udp association for {} (bypassed) with {:?}",
461455
self.peer_addr,
@@ -515,14 +509,12 @@ where
515509
let socket = MonProxySocket::from_socket(socket, self.context.flow_stat());
516510
let socket = Arc::new(socket);
517511

518-
let (r2l_fut, r2l_abortable) = {
512+
// CLIENT <- REMOTE
513+
let r2l_abortable = {
519514
let assoc = self.clone();
520-
future::abortable(assoc.copy_proxied_r2l(socket.clone()))
515+
tokio::spawn(assoc.copy_proxied_r2l(socket.clone()))
521516
};
522517

523-
// CLIENT <- REMOTE
524-
tokio::spawn(r2l_fut);
525-
526518
debug!(
527519
"created udp association for {} <-> {} (proxied) with {:?}",
528520
self.peer_addr,

crates/shadowsocks-service/src/local/redir/udprelay/mod.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use std::{
88
};
99

1010
use async_trait::async_trait;
11-
use futures::future::{self, AbortHandle};
1211
use log::{error, info, trace, warn};
1312
use lru_time_cache::LruCache;
1413
use shadowsocks::{
@@ -17,7 +16,7 @@ use shadowsocks::{
1716
relay::{socks5::Address, udprelay::MAXIMUM_UDP_PAYLOAD_SIZE},
1817
ServerAddr,
1918
};
20-
use tokio::sync::Mutex;
19+
use tokio::{sync::Mutex, task::JoinHandle};
2120

2221
use crate::{
2322
config::RedirType,
@@ -41,7 +40,7 @@ const INBOUND_SOCKET_CACHE_CAPACITY: usize = 256;
4140

4241
struct UdpRedirInboundCache {
4342
cache: Arc<Mutex<LruCache<SocketAddr, Arc<UdpRedirSocket>>>>,
44-
watcher: AbortHandle,
43+
watcher: JoinHandle<()>,
4544
}
4645

4746
impl Drop for UdpRedirInboundCache {
@@ -57,16 +56,15 @@ impl UdpRedirInboundCache {
5756
INBOUND_SOCKET_CACHE_CAPACITY,
5857
)));
5958

60-
let (cleanup_fut, watcher) = {
59+
let watcher = {
6160
let cache = cache.clone();
62-
future::abortable(async move {
61+
tokio::spawn(async move {
6362
loop {
6463
tokio::time::sleep(INBOUND_SOCKET_CACHE_EXPIRATION).await;
6564
let _ = cache.lock().await.iter();
6665
}
6766
})
6867
};
69-
tokio::spawn(cleanup_fut);
7068

7169
UdpRedirInboundCache { cache, watcher }
7270
}

0 commit comments

Comments
 (0)