Skip to content

Commit a05161b

Browse files
committed
reverted client handling to use channels and a write task per client
1 parent 531dab2 commit a05161b

File tree

3 files changed

+48
-39
lines changed

3 files changed

+48
-39
lines changed

server/src/network/client.rs

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use anyhow::bail;
44
use bytes::{Buf, Bytes, BytesMut};
55
use fstr::FString;
66
use slotmap::new_key_type;
7-
use tokio::{io::{self, AsyncReadExt, AsyncWriteExt}, net::{TcpStream, tcp::{OwnedReadHalf, OwnedWriteHalf}}, sync::mpsc::UnboundedSender, task::JoinHandle};
7+
use tokio::{io::{AsyncReadExt, AsyncWriteExt}, net::TcpStream, sync::mpsc::{UnboundedReceiver, UnboundedSender, error::SendError, unbounded_channel}, task::JoinHandle};
88
use uuid::Uuid;
99

1010
use crate::{ClientId, GameProfile, GameProfileProperty, network::{binary::var_int::{VarInt, peek_var_int}, connection_state::ConnectionState, internal_packets::{MainThreadMessage, NetworkThreadMessage}, packets::{packet_buffer::PacketBuffer, packet_deserialize::PacketDeserializable}, protocol::{handshake::serverbound::Handshake, login::{clientbound::LoginSuccess, serverbound::LoginStart}, play::serverbound::Play, status::{clientbound::{StatusPong, StatusResponse}, serverbound::StatusPing}}}, types::status::StatusBytes};
@@ -21,7 +21,7 @@ pub struct Client {
2121
}
2222

2323
pub struct ClientHandler {
24-
writer: OwnedWriteHalf,
24+
writer: UnboundedSender<Bytes>,
2525
handle: JoinHandle<()>
2626
}
2727

@@ -33,30 +33,24 @@ impl ClientHandler {
3333
main_tx: UnboundedSender<MainThreadMessage>,
3434
status: StatusBytes
3535
) -> Self {
36-
let (read, writer) = socket.into_split();
37-
let handle = tokio::spawn(run_client(client_id, read, network_tx, main_tx, status));
38-
Self { writer, handle }
36+
let (tx, rx) = unbounded_channel();
37+
let handle = tokio::spawn(run_client(client_id, socket, rx, network_tx, main_tx, status));
38+
Self { writer: tx, handle }
3939
}
4040

41-
pub async fn send(&mut self, data: &Bytes) -> Result<(), io::Error> {
42-
self.writer.write_all(data).await
41+
pub fn send(&self, data: Bytes) -> Result<(), SendError<Bytes>> {
42+
self.writer.send(data)
4343
}
4444

45-
// this does not inform the network thread about the read tasks shutdown.
46-
pub async fn disconnect(mut self) -> Result<(), io::Error> {
47-
self.handle.abort();
48-
self.writer.shutdown().await?;
49-
Ok(())
50-
}
51-
52-
pub fn abort(&self) {
45+
pub fn abort(self) {
5346
self.handle.abort();
5447
}
5548
}
5649

5750
async fn run_client(
5851
client_id: ClientId,
59-
mut reader: OwnedReadHalf,
52+
mut socket: TcpStream,
53+
mut rx: UnboundedReceiver<Bytes>,
6054
network_tx: UnboundedSender<NetworkThreadMessage>,
6155
main_tx: UnboundedSender<MainThreadMessage>,
6256
status: StatusBytes
@@ -69,27 +63,44 @@ async fn run_client(
6963
let mut bytes = BytesMut::new();
7064

7165
loop {
72-
let res: Result<usize, io::Error> = reader.read_buf(&mut bytes).await;
73-
match res {
74-
Ok(0) => break,
75-
Ok(_) => {
76-
if let Err(err) = read_packets(
77-
&mut bytes,
78-
&mut client,
79-
&network_tx,
80-
&main_tx,
81-
&status
82-
).await {
83-
eprintln!("client {client_id:?} errored: {err}");
84-
break;
66+
tokio::select! {
67+
res = socket.read_buf(&mut bytes) => {
68+
match res {
69+
Ok(0) => break,
70+
Ok(_) => {
71+
if let Err(err) = read_packets(
72+
&mut bytes,
73+
&mut client,
74+
&network_tx,
75+
&main_tx,
76+
&status
77+
).await {
78+
eprintln!("client {client_id:?} errored: {err}");
79+
break;
80+
}
81+
},
82+
Err(e) => {
83+
eprintln!("Client {client_id:?} read error: {e}");
84+
break;
85+
}
8586
}
8687
}
87-
Err(e) => {
88-
eprintln!("Client {client_id:?} read error: {e}");
89-
break;
88+
89+
// we dont need a drop task if we just drop the tx...
90+
opt = rx.recv() => {
91+
match opt {
92+
Some(bytes) => {
93+
if let Err(e) = socket.write_all(&bytes).await {
94+
eprintln!("Socket write error: {}", e);
95+
break
96+
}
97+
}
98+
None => break,
99+
}
90100
}
91101
}
92102
}
103+
93104
println!("client reader closed");
94105
let _ = network_tx.send(NetworkThreadMessage::ConnectionClosed { client_id, connection_state: client.connection_state });
95106
}

server/src/network/network.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,17 @@ async fn run_network_thread(
5555
match msg {
5656
NetworkThreadMessage::UpdateStatus(update) => status.set(update),
5757
NetworkThreadMessage::SendPackets { client_id, buffer } => {
58-
if let Some(handler) = clients.get_mut(client_id) {
59-
if let Err(e) = handler.send(&buffer).await {
60-
eprintln!("Client {:?} handler failed to send: {}", client_id, e);
58+
if let Some(handler) = clients.get(client_id) {
59+
if let Err(e) = handler.send(buffer) {
60+
eprintln!("Client {:?} handler dropped its reciever {}", client_id, e);
6161
clients.remove(client_id);
6262
main_tx.send(MainThreadMessage::ClientDisconnected { client_id }).expect("Main thread should never drop its network reciever.");
6363
}
6464
}
6565
}
6666

6767
NetworkThreadMessage::DisconnectClient { client_id } => {
68-
if let Some(handler) = clients.remove(client_id) {
69-
if let Err(e) = handler.disconnect().await {
70-
eprintln!("Client {:?} writer failed to shutdown: {}", client_id, e);
71-
}
68+
if clients.remove(client_id).is_some() {
7269
main_tx.send(MainThreadMessage::ClientDisconnected { client_id }).expect("Main thread should never drop its network reciever.");
7370
}
7471
}

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub fn spawn_mort(world: &mut World<Dungeon>) {
6262
Some(EntityMetadata::new(EntityVariant::NPC { npc_id: "mort" })),
6363
position, yaw, 0.0,
6464
InteractableNPC { default_yaw: yaw, default_pitch: 0.0, interact_callback: |player| {
65+
player.world().network_tx.send(NetworkThreadMessage::DisconnectClient { client_id: player.client_id });
6566
// todo: messages / dialogue
6667
if let DungeonState::Started { .. } = player.world().state {
6768
return;

0 commit comments

Comments
 (0)