Skip to content

Commit 5de065b

Browse files
committed
Refacctored network and main thread channel handling to gracefully exit
on unrecoverable errors and properly handle recoverable ones.
1 parent 58c0866 commit 5de065b

File tree

5 files changed

+41
-30
lines changed

5 files changed

+41
-30
lines changed

src/main.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::server::utils::dvec3::DVec3;
3131
use crate::server::world::VIEW_DISTANCE;
3232
use crate::utils::hasher::deterministic_hasher::DeterministicHashMap;
3333
use crate::utils::seeded_rng::SeededRng;
34-
use anyhow::Result;
34+
use anyhow::{bail, Result};
3535
use chrono::Local;
3636
use include_dir::include_dir;
3737
use indoc::formatdoc;
@@ -41,6 +41,7 @@ use std::env;
4141
use std::ops::Add;
4242
use std::time::{Duration, SystemTime, UNIX_EPOCH};
4343
use tokio::sync::mpsc::unbounded_channel;
44+
use tokio::sync::mpsc::error::TryRecvError;
4445

4546
#[tokio::main]
4647
async fn main() -> Result<()> {
@@ -165,7 +166,7 @@ async fn main() -> Result<()> {
165166

166167
// test
167168
pub struct MortImpl;
168-
169+
169170
impl EntityImpl for MortImpl {
170171
fn tick(&mut self, _: &mut Entity, _: &mut PacketBuffer) {
171172
// rotate
@@ -177,7 +178,7 @@ async fn main() -> Result<()> {
177178
player.open_ui(UI::MortReadyUpMenu);
178179
}
179180
}
180-
181+
181182
let id = server.world.spawn_entity(
182183
room.get_world_block_pos(&BlockPos { x: 15, y: 69, z: 4 })
183184
.as_dvec3()
@@ -225,10 +226,14 @@ async fn main() -> Result<()> {
225226
tick_interval.tick().await;
226227
// let start = std::time::Instant::now();
227228

228-
while let Ok(message) = main_rx.try_recv() {
229-
server.process_event(message).unwrap_or_else(|err| eprintln!("Error processing event: {err}"));
229+
loop {
230+
match main_rx.try_recv() {
231+
Ok(message) => server.process_event(message).unwrap_or_else(|err| eprintln!("Error processing event: {err}")),
232+
Err(TryRecvError::Empty) => break,
233+
Err(TryRecvError::Disconnected) => bail!("Network thread dropped its reciever."),
234+
}
230235
}
231-
236+
232237
server.dungeon.tick()?;
233238
server.world.tick()?;
234239

src/net/client.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ impl Client {
2929
}
3030
}
3131

32+
// main thread tx errors can be ignored since the network/client threads will close eachother properly regardless of client status,
33+
// which will in turn close the client handlers.
34+
3235
pub async fn handle_client(
3336
client_id: ClientId,
3437
mut socket: TcpStream,

src/net/internal_packets.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub enum NetworkThreadMessage {
1818
/// Disconnects the client from the server.
1919
/// This sends a close handler message to the client's handler.
2020
/// It should be sent after the vanilla disconnect packet is sent.
21+
/// the main thread should wait for a ClientDisconnected response to handle actually removing the player.
2122
DisconnectClient {
2223
client_id: ClientId,
2324
},
@@ -40,11 +41,8 @@ pub enum MainThreadMessage {
4041
profile: GameProfile,
4142
},
4243

44+
/// sent to the main thread when a client is removed for any reason, even reasons caused by the main thread.
4345
ClientDisconnected {
4446
client_id: ClientId,
4547
},
46-
47-
Abort {
48-
reason: String,
49-
},
5048
}

src/net/run_network.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,22 @@ use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
77
use crate::net::internal_packets::{ClientHandlerMessage, MainThreadMessage, NetworkThreadMessage};
88
use crate::server::player::player::ClientId;
99

10-
/// runs the network thread. It is very important that nothing here panics without alerting the main thread.
11-
/// if a handle_client panics, it should send a disconnect player to the main thread. (however we dont have anything that could panic there atm)
10+
type ClientMap = HashMap<ClientId, UnboundedSender<ClientHandlerMessage>>;
11+
1212
pub async fn run_network_thread(
1313
mut network_rx: UnboundedReceiver<NetworkThreadMessage>,
1414
network_tx: UnboundedSender<NetworkThreadMessage>,
1515
main_tx: UnboundedSender<MainThreadMessage>,
1616
) {
17-
let listener = TcpListener::bind("127.0.0.1:4972").await.unwrap_or_else(|err| {
18-
let _ = main_tx.send(MainThreadMessage::Abort { reason: format!("TCP failed to bind: {}", err) });
19-
panic!("{}", err)
20-
});
17+
let listener = TcpListener::bind("127.0.0.1:4972").await.unwrap();
2118
println!("Network thread listening on 127.0.0.1:4972");
2219

23-
let mut clients: HashMap<ClientId, UnboundedSender<ClientHandlerMessage>> = HashMap::new();
20+
let mut clients: ClientMap = HashMap::new();
2421
let mut client_id_counter: ClientId = 1;
2522

2623
loop {
2724
tokio::select! {
25+
// a client failing to connect here is recoverable and doesnt really do anything, so we can just ignore it.
2826
Ok((socket, _)) = listener.accept() => {
2927
let client_id: ClientId = client_id_counter;
3028
client_id_counter += 1;
@@ -35,28 +33,39 @@ pub async fn run_network_thread(
3533
tokio::spawn(handle_client(client_id, socket, client_rx, main_tx.clone(), network_tx.clone()));
3634
}
3735

36+
// this can never be none since this function owns a network_tx.
3837
Some(msg) = network_rx.recv() => {
39-
match msg {
38+
// we can just discard main thread -> network thread messages with a disconnected client_id
39+
// as the main thread either already has or will be be informed shortly of this issue
40+
match msg {
4041
NetworkThreadMessage::SendPackets { client_id, buffer } => {
4142
if let Some(client_tx) = clients.get(&client_id) {
42-
let _ = client_tx.send(ClientHandlerMessage::Send(buffer));
43+
if let Err(e) = client_tx.send(ClientHandlerMessage::Send(buffer)) {
44+
eprintln!("Client {} handler dropped its reciever: {}", client_id, e);
45+
disconnect_client(client_id, &main_tx, &mut clients);
46+
}
4347
}
4448
}
45-
49+
4650
NetworkThreadMessage::DisconnectClient { client_id } => {
4751
if let Some(client_tx) = clients.get(&client_id) {
48-
let _ = client_tx.send(ClientHandlerMessage::CloseHandler);
49-
} else {
50-
eprintln!("Attempted to disconnect nonexistent client {}", client_id);
52+
if let Err(e) = client_tx.send(ClientHandlerMessage::CloseHandler) {
53+
eprintln!("Client {} handler dropped its reciever: {}", client_id, e);
54+
disconnect_client(client_id, &main_tx, &mut clients);
55+
}
5156
}
5257
}
53-
58+
5459
NetworkThreadMessage::ConnectionClosed { client_id } => {
55-
let _ = main_tx.send(MainThreadMessage::ClientDisconnected { client_id });
56-
clients.remove(&client_id);
60+
disconnect_client(client_id, &main_tx, &mut clients);
5761
}
5862
}
5963
}
6064
}
6165
}
66+
}
67+
68+
fn disconnect_client(client_id: ClientId, main_tx: &UnboundedSender<MainThreadMessage>, clients: &mut ClientMap) {
69+
main_tx.send(MainThreadMessage::ClientDisconnected { client_id }).expect("Main thread should never drop its network reciever.");
70+
clients.remove(&client_id);
6271
}

src/server/server.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::dungeon::dungeon::Dungeon;
22
use crate::net::internal_packets::{MainThreadMessage, NetworkThreadMessage};
33
use crate::net::packets::packet::ProcessPacket;
4-
use crate::net::packets::packet_serialize::PacketSerializable;
54
use crate::net::protocol::play::clientbound::{AddEffect, CustomPayload, EntityProperties, JoinGame, PlayerAbilities, PlayerListHeaderFooter, PositionLook};
65
use crate::net::var_int::VarInt;
76
use crate::server::items::Item;
@@ -193,9 +192,6 @@ impl Server {
193192
let player = self.world.players.get_mut(&client_id).context(format!("Player not found for id {client_id}"))?;
194193
packet.process_with_player(player);
195194
},
196-
MainThreadMessage::Abort { reason } => {
197-
panic!("Network called for shutdown: {}", reason);
198-
},
199195
}
200196
Ok(())
201197
}

0 commit comments

Comments
 (0)