Skip to content

Commit 522a1af

Browse files
committed
fix: Make join work on UDP package loss
- Client retries the handshake operation until receives response from server or times out. - Make server and client work on all network interfaces instead of just localhost
1 parent bfb53ba commit 522a1af

File tree

5 files changed

+139
-98
lines changed

5 files changed

+139
-98
lines changed

src/app.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,6 @@ impl<'a> App<'a> {
7676
}
7777

7878
fn run(&mut self, event_loop: &mut EventLoop<()>) {
79-
const MAX_LOGIC_UPDATE_PER_SECOND: f32 = 60.0;
80-
const FIXED_UPDATE_TIMESTEP: f32 = 1.0 / MAX_LOGIC_UPDATE_PER_SECOND;
81-
8279
let mut previous_time = std::time::Instant::now();
8380
let mut lag: f32 = 0.0;
8481
loop {
@@ -95,9 +92,9 @@ impl<'a> App<'a> {
9592
self.process_server_response();
9693
}
9794

98-
while lag >= FIXED_UPDATE_TIMESTEP {
95+
while lag >= globals::FIXED_UPDATE_TIMESTEP_SEC {
9996
self.update();
100-
lag -= FIXED_UPDATE_TIMESTEP;
97+
lag -= globals::FIXED_UPDATE_TIMESTEP_SEC;
10198
}
10299

103100
self.window.as_ref().unwrap().request_redraw();
@@ -116,20 +113,22 @@ impl<'a> App<'a> {
116113
.client_session
117114
.as_mut()
118115
.unwrap()
119-
.receive_server_resposne()
116+
.receive_server_response()
120117
{
121118
message::trace(format!("Received: {}", msg));
122119
match Message::deserialize(&msg) {
123120
Ok(Message::Replicate(new_player)) => {
124-
self.remote_players.insert(new_player.id, new_player);
125-
self.gui
126-
.as_mut()
127-
.unwrap()
128-
.log(format!("Player {} has joined the server", new_player.id));
129-
}
130-
Ok(Message::Position(id, pos)) => {
131-
if let Some(player) = self.remote_players.get_mut(&id) {
132-
player.pos = pos;
121+
if let Some(player) = self.remote_players.get_mut(&new_player.id) {
122+
// Update existing player based on server's simulation
123+
player.pos = new_player.pos;
124+
} else {
125+
// On-demand remote player creation because replication does not
126+
// fit into the handshake ACK message.
127+
self.remote_players.insert(new_player.id, new_player);
128+
self.gui
129+
.as_mut()
130+
.unwrap()
131+
.log(format!("Player {} has joined the server", new_player.id));
133132
}
134133
}
135134
Ok(Message::Leave(id)) => {
@@ -188,7 +187,7 @@ impl<'a> App<'a> {
188187
}
189188
}
190189
}
191-
Some(_) => (), // Task is still running
190+
Some(_) => (), // Task is still running, nothing to do
192191
None => {
193192
// Fire task if not exists
194193
let server_address = server_address.clone();

src/client.rs

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ pub type ClientSessionResult = Result<ClientSession, Box<dyn Error + Send + Sync
2828

2929
impl ClientSession {
3030
pub async fn new(server_address: String) -> ClientSessionResult {
31-
match tokio::time::timeout(globals::CONNECTION_TIMEOUT_DURATION, async {
32-
let client_socket = UdpSocket::bind("127.0.0.1:0").await?;
31+
match tokio::time::timeout(globals::CONNECTION_TIMEOUT_SEC, async {
32+
let client_socket = UdpSocket::bind("0.0.0.0:0").await?;
3333
let client_socket = Arc::new(client_socket);
3434

3535
let session_player = join_server(&client_socket, &server_address).await?;
@@ -58,7 +58,7 @@ impl ClientSession {
5858
Ok(client_session) => client_session,
5959
Err(_) => Err(format!(
6060
"Connection timed out after {} seconds.",
61-
globals::CONNECTION_TIMEOUT_DURATION.as_secs()
61+
globals::CONNECTION_TIMEOUT_SEC.as_secs()
6262
)
6363
.into()),
6464
}
@@ -68,7 +68,7 @@ impl ClientSession {
6868
self.session_player
6969
}
7070

71-
pub fn receive_server_resposne(&mut self) -> Result<String, TryRecvError> {
71+
pub fn receive_server_response(&mut self) -> Result<String, TryRecvError> {
7272
match self.listen_rx.try_recv() {
7373
Ok(response) => {
7474
if let Ok(Message::Ping) = Message::deserialize(&response) {
@@ -87,7 +87,7 @@ impl ClientSession {
8787
}
8888

8989
pub fn is_server_alive(&self) -> bool {
90-
self.last_ping.elapsed() < globals::CONNECTION_TIMEOUT_DURATION
90+
self.last_ping.elapsed() < globals::CONNECTION_TIMEOUT_SEC
9191
}
9292

9393
pub fn leave_server(&self, player_id: PlayerID) {
@@ -108,33 +108,53 @@ async fn join_server(
108108
client_socket: &UdpSocket,
109109
server_address: &String,
110110
) -> Result<Player, Box<dyn Error + Send + Sync>> {
111-
let msg = Message::Handshake.serialize();
112-
client_socket
113-
.send_to(msg.as_bytes(), server_address)
114-
.await?;
115-
message::trace(format!("Sent: {msg}"));
116-
117-
let mut ack_buf = [0u8; 32];
118-
let (len, _) = client_socket.recv_from(&mut ack_buf).await?;
119-
let response = String::from_utf8_lossy(&ack_buf[..len]).to_string();
120-
message::trace(format!("Handshake result: {response}"));
121-
122-
let result = match Message::deserialize(&response) {
123-
Ok(Message::Ack(new_id, new_color)) => Ok(Player::new(new_id, new_color)),
124-
Ok(_) => Err("Invalid handshake received".into()),
125-
Err(e) => Err(format!("Handshake failed: {e}").into()),
126-
};
127-
result
111+
let handshake_msg = Message::Handshake.serialize();
112+
// Loop abort happens on timeout in ClientSession::new()
113+
loop {
114+
client_socket
115+
.send_to(handshake_msg.as_bytes(), server_address)
116+
.await?;
117+
message::trace(format!("Sent: {handshake_msg}"));
118+
119+
match receive_with_retry_timeout(client_socket).await {
120+
Ok(response) => {
121+
if let Ok(Message::Ack(new_id, new_color)) = Message::deserialize(&response) {
122+
message::trace(format!("Handshake result: {response}"));
123+
return Ok(Player::new(new_id, new_color));
124+
}
125+
126+
message::trace(format!("Invalid handshake response: {response}"));
127+
}
128+
_ => continue, // Keep trying, I know you can do it!
129+
}
130+
}
131+
}
132+
133+
async fn receive_with_retry_timeout(
134+
socket: &UdpSocket,
135+
) -> Result<String, Box<dyn Error + Send + Sync>> {
136+
let retry_timeout = std::time::Duration::from_millis(300);
137+
let mut buf = [0u8; 32];
138+
match tokio::time::timeout(retry_timeout, socket.recv_from(&mut buf)).await {
139+
Ok(result) => {
140+
let (len, _) = result?;
141+
Ok(String::from_utf8_lossy(&buf[..len]).to_string())
142+
}
143+
Err(_) => {
144+
message::trace("No response (sender or receiver package lost)".to_string());
145+
Err("Receive operation timed out".into())
146+
}
147+
}
128148
}
129149

130-
async fn listen_handler(socket: Arc<UdpSocket>, tx: ChannelSender) {
150+
async fn listen_handler(socket: Arc<UdpSocket>, listen_tx: ChannelSender) {
131151
let mut buf = [0u8; 1024];
132152
loop {
133153
match socket.recv_from(&mut buf).await {
134154
Ok((len, _)) => {
135155
if let Ok(msg) = std::str::from_utf8(&buf[..len]) {
136156
// Pass message to main thread
137-
if tx.send(msg.to_string()).is_err() {
157+
if listen_tx.send(msg.to_string()).is_err() {
138158
break;
139159
}
140160
}

src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,15 @@ pub mod globals {
5353

5454
pub const LOCALHOST: &str = "127.0.0.1";
5555
pub const DEFAULT_PORT: u16 = 8080;
56-
pub const PING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2);
57-
pub const CONNECTION_TIMEOUT_DURATION: std::time::Duration = std::time::Duration::from_secs(5);
56+
pub const PING_INTERVAL_MS: std::time::Duration = std::time::Duration::from_millis(20);
57+
pub const CONNECTION_TIMEOUT_SEC: std::time::Duration = std::time::Duration::from_secs(5);
5858

5959
pub const WINDOW_SIZE: (u16, u16) = (800, 600);
6060
pub const WINDOW_TITLE: &str = "Multiplayer game demo by Bálint Kiss";
6161

62+
pub const MAX_LOGIC_UPDATE_PER_SEC: f32 = 60.0;
63+
pub const FIXED_UPDATE_TIMESTEP_SEC: f32 = 1.0 / MAX_LOGIC_UPDATE_PER_SEC;
64+
6265
pub const PLAYER_QUAD_SIZE: f32 = 24.0;
6366
pub const WORLD_BOUNDS: WorldBounds = WorldBounds {
6467
min_x: -1200.0,

src/message.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@ pub enum Message {
1111
Ack(PlayerID, Vector3<f32>),
1212
Leave(PlayerID),
1313
Replicate(Player),
14+
// TODO: Avoid clients self-reporting their exact own position and opt for sending input state
15+
// instead
1416
Position(PlayerID, Vector2<f32>),
1517
}
1618

1719
const PING: &str = "PING";
1820
const HANDSHAKE: &str = "HANDSHAKE";
1921
const ACK: &str = "ACK";
2022
const LEAVE: &str = "LEAVE";
21-
const PUSH: &str = "PUSH";
23+
const REPL: &str = "REPL";
2224
const POS: &str = "POS";
2325

2426
impl Message {
@@ -68,7 +70,7 @@ impl Message {
6870
})?;
6971
Ok(Message::Leave(player_id))
7072
}
71-
Some(PUSH) if parts.len() == 3 => {
73+
Some(REPL) if parts.len() == 3 => {
7274
let player_id = parts[1].parse().map_err(|_| {
7375
std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid PlayerID")
7476
})?;
@@ -126,7 +128,7 @@ impl Message {
126128
Message::Handshake => HANDSHAKE,
127129
Message::Ack(_, _) => ACK,
128130
Message::Leave(_) => LEAVE,
129-
Message::Replicate(_) => PUSH,
131+
Message::Replicate(_) => REPL,
130132
Message::Position(_, _) => POS,
131133
}
132134
}

0 commit comments

Comments
 (0)