diff --git a/src/listener.rs b/src/listener.rs index c931584..9b39793 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -151,8 +151,9 @@ impl KcpListener { pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll> { self.accept_rx.poll_recv(cx).map(|op_res| { - op_res - .ok_or_else(|| KcpError::IoError(io::Error::new(ErrorKind::Other, "accept channel closed unexpectedly"))) + op_res.ok_or_else(|| { + KcpError::IoError(io::Error::new(ErrorKind::Other, "accept channel closed unexpectedly")) + }) }) } diff --git a/src/session.rs b/src/session.rs index b327fb9..e2032a6 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,5 +1,5 @@ use std::{ - collections::{hash_map::Entry, HashMap}, + collections::HashMap, fmt::{self, Debug}, net::SocketAddr, ops::Deref, @@ -324,46 +324,31 @@ impl KcpSessionManager { peer_addr: SocketAddr, session_close_notifier: &mpsc::Sender, ) -> KcpResult<(Arc, bool)> { - match self.sessions.entry(peer_addr) { - Entry::Occupied(mut occ) => { - let session = occ.get(); - - if sn == 0 && session.conv().await != conv { - // This is the first packet received from this peer. - // Recreate a new session for this specific client. - - let socket = KcpSocket::new(config, conv, udp.clone(), peer_addr, config.stream)?; - let session = KcpSession::new_shared( - socket, - config.session_expire, - Some((session_close_notifier.clone(), peer_addr)), - ); - - let old_session = occ.insert(KcpSessionUniq(session.clone())); - let old_conv = old_session.conv().await; - trace!( - "replaced session with conv: {} (old: {}), peer: {}", - conv, - old_conv, - peer_addr - ); - - Ok((session, true)) - } else { - Ok((session.0.clone(), false)) - } - } - Entry::Vacant(vac) => { - let socket = KcpSocket::new(config, conv, udp.clone(), peer_addr, config.stream)?; - let session = KcpSession::new_shared( - socket, - config.session_expire, - Some((session_close_notifier.clone(), peer_addr)), + let old_conv_id = if let Some(session) = self.sessions.get(&peer_addr) { + Some(session.conv().await) + } else { + None + }; + let update = old_conv_id.is_none() || (sn == 0 && old_conv_id.unwrap() != conv); + if old_conv_id.is_none() || (sn == 0 && old_conv_id.unwrap() != conv) { + let socket = KcpSocket::new(config, conv, udp.clone(), peer_addr, config.stream)?; + let session = KcpSession::new_shared( + socket, + config.session_expire, + Some((session_close_notifier.clone(), peer_addr)), + ); + if let Some(old_conv) = old_conv_id { + trace!( + "replaced session with conv: {} (old: {}), peer: {}", + conv, + old_conv, + peer_addr ); + } else { trace!("created session for conv: {}, peer: {}", conv, peer_addr); - vac.insert(KcpSessionUniq(session.clone())); - Ok((session, true)) } + self.sessions.entry(peer_addr).insert_entry(KcpSessionUniq(session)); } + Ok((self.sessions.get(&peer_addr).unwrap().0.clone(), update)) } }