Skip to content

Commit 9ce69e5

Browse files
committed
killing UDP association relay tasks properly when server is closed
- ref #557 - ref #271
1 parent 6490e34 commit 9ce69e5

File tree

3 files changed

+117
-31
lines changed

3 files changed

+117
-31
lines changed

crates/shadowsocks-service/src/local/net/udp/association.rs

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ where
5454
context: Arc<ServiceContext>,
5555
assoc_map: SharedAssociationMap<W>,
5656
cleanup_abortable: AbortHandle,
57+
keepalive_abortable: AbortHandle,
58+
keepalive_tx: mpsc::Sender<SocketAddr>,
5759
balancer: PingBalancer,
5860
}
5961

@@ -63,6 +65,7 @@ where
6365
{
6466
fn drop(&mut self) {
6567
self.cleanup_abortable.abort();
68+
self.keepalive_abortable.abort();
6669
}
6770
}
6871

@@ -98,11 +101,26 @@ where
98101
cleanup_abortable
99102
};
100103

104+
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256);
105+
106+
let keepalive_abortable = {
107+
let assoc_map = assoc_map.clone();
108+
let (keepalive_task, keepalive_abortable) = future::abortable(async move {
109+
while let Some(peer_addr) = keepalive_rx.recv().await {
110+
assoc_map.lock().await.get(&peer_addr);
111+
}
112+
});
113+
tokio::spawn(keepalive_task);
114+
keepalive_abortable
115+
};
116+
101117
UdpAssociationManager {
102118
respond_writer,
103119
context,
104120
assoc_map,
105121
cleanup_abortable,
122+
keepalive_abortable,
123+
keepalive_tx,
106124
balancer,
107125
}
108126
}
@@ -120,7 +138,7 @@ where
120138
let assoc = UdpAssociation::new(
121139
self.context.clone(),
122140
peer_addr,
123-
self.assoc_map.clone(),
141+
self.keepalive_tx.clone(),
124142
self.balancer.clone(),
125143
self.respond_writer.clone(),
126144
);
@@ -160,11 +178,11 @@ where
160178
fn new(
161179
context: Arc<ServiceContext>,
162180
peer_addr: SocketAddr,
163-
assoc_map: SharedAssociationMap<W>,
181+
keepalive_tx: mpsc::Sender<SocketAddr>,
164182
balancer: PingBalancer,
165183
respond_writer: W,
166184
) -> UdpAssociation<W> {
167-
let (assoc, sender) = UdpAssociationContext::new(context, peer_addr, assoc_map, balancer, respond_writer);
185+
let (assoc, sender) = UdpAssociationContext::new(context, peer_addr, keepalive_tx, balancer, respond_writer);
168186
UdpAssociation { assoc, sender }
169187
}
170188

@@ -219,9 +237,7 @@ enum UdpAssociationProxyState {
219237

220238
impl Drop for UdpAssociationProxyState {
221239
fn drop(&mut self) {
222-
if let UdpAssociationProxyState::Connected { ref abortable, .. } = *self {
223-
abortable.abort();
224-
}
240+
self.abort_inner();
225241
}
226242
}
227243

@@ -231,16 +247,25 @@ impl UdpAssociationProxyState {
231247
}
232248

233249
fn reset(&mut self) {
250+
self.abort_inner();
234251
*self = UdpAssociationProxyState::Empty;
235252
}
236253

237254
fn set_connected(&mut self, socket: Arc<MonProxySocket>, abortable: AbortHandle) {
255+
self.abort_inner();
238256
*self = UdpAssociationProxyState::Connected { socket, abortable };
239257
}
240258

241259
fn abort(&mut self) {
260+
self.abort_inner();
242261
*self = UdpAssociationProxyState::Aborted;
243262
}
263+
264+
fn abort_inner(&mut self) {
265+
if let UdpAssociationProxyState::Connected { ref abortable, .. } = *self {
266+
abortable.abort();
267+
}
268+
}
244269
}
245270

246271
struct UdpAssociationContext<W>
@@ -252,7 +277,7 @@ where
252277
bypassed_ipv4_socket: SpinMutex<UdpAssociationBypassState>,
253278
bypassed_ipv6_socket: SpinMutex<UdpAssociationBypassState>,
254279
proxied_socket: SpinMutex<UdpAssociationProxyState>,
255-
assoc_map: SharedAssociationMap<W>,
280+
keepalive_tx: mpsc::Sender<SocketAddr>,
256281
balancer: PingBalancer,
257282
respond_writer: W,
258283
}
@@ -273,7 +298,7 @@ where
273298
fn new(
274299
context: Arc<ServiceContext>,
275300
peer_addr: SocketAddr,
276-
assoc_map: SharedAssociationMap<W>,
301+
keepalive_tx: mpsc::Sender<SocketAddr>,
277302
balancer: PingBalancer,
278303
respond_writer: W,
279304
) -> (Arc<UdpAssociationContext<W>>, mpsc::Sender<(Address, Bytes)>) {
@@ -288,7 +313,7 @@ where
288313
bypassed_ipv4_socket: SpinMutex::new(UdpAssociationBypassState::empty()),
289314
bypassed_ipv6_socket: SpinMutex::new(UdpAssociationBypassState::empty()),
290315
proxied_socket: SpinMutex::new(UdpAssociationProxyState::empty()),
291-
assoc_map,
316+
keepalive_tx,
292317
balancer,
293318
respond_writer,
294319
});
@@ -557,7 +582,10 @@ where
557582
n
558583
);
559584
// Keep association alive in map
560-
let _ = self.assoc_map.lock().await.get(&self.peer_addr);
585+
let _ = self
586+
.keepalive_tx
587+
.send_timeout(self.peer_addr, Duration::from_secs(1))
588+
.await;
561589
(n, addr)
562590
}
563591
Err(err) => {
@@ -608,7 +636,10 @@ where
608636
n
609637
);
610638
// Keep association alive in map
611-
let _ = self.assoc_map.lock().await.get(&self.peer_addr);
639+
let _ = self
640+
.keepalive_tx
641+
.send_timeout(self.peer_addr, Duration::from_secs(1))
642+
.await;
612643
(n, addr)
613644
}
614645
Err(err) => {

crates/shadowsocks-service/src/local/tunnel/udprelay.rs

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@ pub struct UdpTunnel {
3535
context: Arc<ServiceContext>,
3636
assoc_map: SharedAssociationMap,
3737
cleanup_abortable: AbortHandle,
38+
keepalive_abortable: AbortHandle,
39+
keepalive_tx: mpsc::Sender<SocketAddr>,
3840
}
3941

4042
impl Drop for UdpTunnel {
4143
fn drop(&mut self) {
4244
self.cleanup_abortable.abort();
45+
self.keepalive_abortable.abort();
4346
}
4447
}
4548

@@ -65,10 +68,25 @@ impl UdpTunnel {
6568
cleanup_abortable
6669
};
6770

71+
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256);
72+
73+
let keepalive_abortable = {
74+
let assoc_map = assoc_map.clone();
75+
let (keepalive_task, keepalive_abortable) = future::abortable(async move {
76+
while let Some(peer_addr) = keepalive_rx.recv().await {
77+
assoc_map.lock().await.get(&peer_addr);
78+
}
79+
});
80+
tokio::spawn(keepalive_task);
81+
keepalive_abortable
82+
};
83+
6884
UdpTunnel {
6985
context,
7086
assoc_map,
7187
cleanup_abortable,
88+
keepalive_abortable,
89+
keepalive_tx,
7290
}
7391
}
7492

@@ -139,7 +157,7 @@ impl UdpTunnel {
139157
listener.clone(),
140158
peer_addr,
141159
forward_addr.clone(),
142-
self.assoc_map.clone(),
160+
self.keepalive_tx.clone(),
143161
balancer.clone(),
144162
);
145163

@@ -169,11 +187,11 @@ impl UdpAssociation {
169187
inbound: Arc<UdpSocket>,
170188
peer_addr: SocketAddr,
171189
forward_addr: Address,
172-
assoc_map: SharedAssociationMap,
190+
keepalive_tx: mpsc::Sender<SocketAddr>,
173191
balancer: PingBalancer,
174192
) -> UdpAssociation {
175193
let (assoc, sender) =
176-
UdpAssociationContext::new(context, inbound, peer_addr, forward_addr, assoc_map, balancer);
194+
UdpAssociationContext::new(context, inbound, peer_addr, forward_addr, keepalive_tx, balancer);
177195
UdpAssociation { sender, assoc }
178196
}
179197

@@ -197,9 +215,7 @@ enum UdpAssociationState {
197215

198216
impl Drop for UdpAssociationState {
199217
fn drop(&mut self) {
200-
if let UdpAssociationState::Connected { ref abortable, .. } = *self {
201-
abortable.abort();
202-
}
218+
self.abort_inner();
203219
}
204220
}
205221

@@ -209,16 +225,25 @@ impl UdpAssociationState {
209225
}
210226

211227
fn reset(&mut self) {
228+
self.abort_inner();
212229
*self = UdpAssociationState::Empty;
213230
}
214231

215232
fn set_connected(&mut self, socket: Arc<MonProxySocket>, abortable: AbortHandle) {
233+
self.abort_inner();
216234
*self = UdpAssociationState::Connected { socket, abortable };
217235
}
218236

219237
fn abort(&mut self) {
238+
self.abort_inner();
220239
*self = UdpAssociationState::Aborted;
221240
}
241+
242+
fn abort_inner(&mut self) {
243+
if let UdpAssociationState::Connected { ref abortable, .. } = *self {
244+
abortable.abort();
245+
}
246+
}
222247
}
223248

224249
struct UdpAssociationContext {
@@ -227,7 +252,7 @@ struct UdpAssociationContext {
227252
peer_addr: SocketAddr,
228253
forward_addr: Address,
229254
proxied_socket: SpinMutex<UdpAssociationState>,
230-
assoc_map: SharedAssociationMap,
255+
keepalive_tx: mpsc::Sender<SocketAddr>,
231256
balancer: PingBalancer,
232257
}
233258

@@ -243,7 +268,7 @@ impl UdpAssociationContext {
243268
inbound: Arc<UdpSocket>,
244269
peer_addr: SocketAddr,
245270
forward_addr: Address,
246-
assoc_map: SharedAssociationMap,
271+
keepalive_tx: mpsc::Sender<SocketAddr>,
247272
balancer: PingBalancer,
248273
) -> (Arc<UdpAssociationContext>, mpsc::Sender<Bytes>) {
249274
// Pending packets 1024 should be good enough for a server.
@@ -257,7 +282,7 @@ impl UdpAssociationContext {
257282
peer_addr,
258283
forward_addr,
259284
proxied_socket: SpinMutex::new(UdpAssociationState::empty()),
260-
assoc_map,
285+
keepalive_tx,
261286
balancer,
262287
});
263288

@@ -374,7 +399,10 @@ impl UdpAssociationContext {
374399
Ok((n, addr)) => {
375400
trace!("udp relay {} <- {} received {} bytes", self.peer_addr, addr, n);
376401
// Keep association alive in map
377-
let _ = self.assoc_map.lock().await.get(&self.peer_addr);
402+
let _ = self
403+
.keepalive_tx
404+
.send_timeout(self.peer_addr, Duration::from_secs(1))
405+
.await;
378406
n
379407
}
380408
Err(err) => {

0 commit comments

Comments
 (0)