From e468d5d023f87ba9538425a651cc3b6025cb2819 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Tue, 13 May 2025 20:41:56 +0700 Subject: [PATCH 1/3] fix: conv is not async and update sessions map --- src/listener.rs | 2 +- src/session.rs | 60 +++++++++++++++++-------------------------------- 2 files changed, 22 insertions(+), 40 deletions(-) diff --git a/src/listener.rs b/src/listener.rs index c931584..ed11cc9 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -99,7 +99,7 @@ impl KcpListener { continue; } } else { - let session_conv = s.conv().await; + let session_conv = s.conv(); if session_conv != conv { debug!("received peer: {} with conv: {} not match with session conv: {}", peer_addr, diff --git a/src/session.rs b/src/session.rs index b327fb9..c9901e1 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map::Entry, HashMap}, + collections::HashMap, fmt::{self, Debug}, net::SocketAddr, ops::Deref, @@ -263,7 +263,7 @@ impl KcpSession { self.input_tx.send(buf.to_owned()).await.map_err(|_| SessionClosedError) } - pub async fn conv(&self) -> u32 { + pub fn conv(&self) -> u32 { let socket = self.socket.lock(); socket.conv() } @@ -324,46 +324,28 @@ impl KcpSessionManager { peer_addr: SocketAddr, session_close_notifier: &mpsc::Sender, ) -> KcpResult<(Arc, bool)> { - match self.sessions.entry(peer_addr) { - Entry::Occupied(mut occ) => { - let session = occ.get(); - - if sn == 0 && session.conv().await != conv { - // This is the first packet received from this peer. - // Recreate a new session for this specific client. - - let socket = KcpSocket::new(config, conv, udp.clone(), peer_addr, config.stream)?; - let session = KcpSession::new_shared( - socket, - config.session_expire, - Some((session_close_notifier.clone(), peer_addr)), - ); - - let old_session = occ.insert(KcpSessionUniq(session.clone())); - let old_conv = old_session.conv().await; - trace!( - "replaced session with conv: {} (old: {}), peer: {}", - conv, - old_conv, - peer_addr - ); - - Ok((session, true)) - } else { - Ok((session.0.clone(), false)) - } - } - Entry::Vacant(vac) => { - let socket = KcpSocket::new(config, conv, udp.clone(), peer_addr, config.stream)?; - let session = KcpSession::new_shared( - socket, - config.session_expire, - Some((session_close_notifier.clone(), peer_addr)), + let old_conv_id = self.sessions.get(&peer_addr).map(|s| s.conv()); + let update = old_conv_id.is_none() || (sn ==0 && old_conv_id.unwrap() != conv); + if old_conv_id.is_none() || (sn ==0 && old_conv_id.unwrap() != conv) { + let socket = KcpSocket::new(config, conv, udp.clone(), peer_addr, config.stream)?; + let session = KcpSession::new_shared( + socket, + config.session_expire, + Some((session_close_notifier.clone(), peer_addr)), + ); + if let Some(old_conv) = old_conv_id { + trace!( + "replaced session with conv: {} (old: {}), peer: {}", + conv, + old_conv, + peer_addr ); + } else { trace!("created session for conv: {}, peer: {}", conv, peer_addr); - vac.insert(KcpSessionUniq(session.clone())); - Ok((session, true)) } + self.sessions.entry(peer_addr).insert_entry(KcpSessionUniq(session)); } + Ok((self.sessions.get(&peer_addr).unwrap().0.clone(), update)) + } } From cb075d324aaefb3803c5986137b9c96ffe9fc4c7 Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Wed, 14 May 2025 13:06:04 +0700 Subject: [PATCH 2/3] fix: for API consistency keep conv async --- src/listener.rs | 2 +- src/session.rs | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/listener.rs b/src/listener.rs index ed11cc9..c931584 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -99,7 +99,7 @@ impl KcpListener { continue; } } else { - let session_conv = s.conv(); + let session_conv = s.conv().await; if session_conv != conv { debug!("received peer: {} with conv: {} not match with session conv: {}", peer_addr, diff --git a/src/session.rs b/src/session.rs index c9901e1..c4a2d0d 100644 --- a/src/session.rs +++ b/src/session.rs @@ -263,7 +263,7 @@ impl KcpSession { self.input_tx.send(buf.to_owned()).await.map_err(|_| SessionClosedError) } - pub fn conv(&self) -> u32 { + pub async fn conv(&self) -> u32 { let socket = self.socket.lock(); socket.conv() } @@ -324,7 +324,11 @@ impl KcpSessionManager { peer_addr: SocketAddr, session_close_notifier: &mpsc::Sender, ) -> KcpResult<(Arc, bool)> { - let old_conv_id = self.sessions.get(&peer_addr).map(|s| s.conv()); + let old_conv_id = if let Some(session) = self.sessions.get(&peer_addr) { + Some(session.conv().await) + } else { + None + }; let update = old_conv_id.is_none() || (sn ==0 && old_conv_id.unwrap() != conv); if old_conv_id.is_none() || (sn ==0 && old_conv_id.unwrap() != conv) { let socket = KcpSocket::new(config, conv, udp.clone(), peer_addr, config.stream)?; From 997d6d72a2dedba6b99010376fe3b207da14632f Mon Sep 17 00:00:00 2001 From: harsh-98 Date: Thu, 15 May 2025 18:53:30 +0700 Subject: [PATCH 3/3] fix: formatting --- src/listener.rs | 5 +++-- src/session.rs | 5 ++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/listener.rs b/src/listener.rs index c931584..9b39793 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -151,8 +151,9 @@ impl KcpListener { pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { self.accept_rx.poll_recv(cx).map(|op_res| { - op_res - .ok_or_else(|| KcpError::IoError(io::Error::new(ErrorKind::Other, "accept channel closed unexpectedly"))) + op_res.ok_or_else(|| { + KcpError::IoError(io::Error::new(ErrorKind::Other, "accept channel closed unexpectedly")) + }) }) } diff --git a/src/session.rs b/src/session.rs index c4a2d0d..e2032a6 100644 --- a/src/session.rs +++ b/src/session.rs @@ -329,8 +329,8 @@ impl KcpSessionManager { } else { None }; - let update = old_conv_id.is_none() || (sn ==0 && old_conv_id.unwrap() != conv); - if old_conv_id.is_none() || (sn ==0 && old_conv_id.unwrap() != conv) { + let update = old_conv_id.is_none() || (sn == 0 && old_conv_id.unwrap() != conv); + if old_conv_id.is_none() || (sn == 0 && old_conv_id.unwrap() != conv) { let socket = KcpSocket::new(config, conv, udp.clone(), peer_addr, config.stream)?; let session = KcpSession::new_shared( socket, @@ -350,6 +350,5 @@ impl KcpSessionManager { self.sessions.entry(peer_addr).insert_entry(KcpSessionUniq(session)); } Ok((self.sessions.get(&peer_addr).unwrap().0.clone(), update)) - } }