Skip to content

Commit 43a836b

Browse files
committed
made clients not have their own channel
1 parent de85634 commit 43a836b

File tree

3 files changed

+106
-129
lines changed

3 files changed

+106
-129
lines changed

server/src/network/client.rs

Lines changed: 81 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,112 +1,107 @@
1-
use crate::network::binary::var_int::{peek_var_int, VarInt};
2-
use crate::network::connection_state::ConnectionState;
3-
use crate::network::connection_state::ConnectionState::*;
4-
use crate::network::internal_packets::{ClientHandlerMessage, MainThreadMessage, NetworkThreadMessage};
5-
use crate::network::packets::packet_buffer::PacketBuffer;
6-
use crate::network::packets::packet_deserialize::PacketDeserializable;
7-
use crate::network::protocol::handshake::serverbound::Handshake;
8-
use crate::network::protocol::login::clientbound::LoginSuccess;
9-
use crate::network::protocol::login::serverbound::LoginStart;
10-
use crate::network::protocol::play::serverbound::Play;
11-
use crate::network::protocol::status::clientbound::{StatusPong, StatusResponse};
12-
use crate::network::protocol::status::serverbound::StatusPing;
13-
use crate::types::status::StatusBytes;
14-
use crate::player::player::{ClientId, GameProfile};
15-
use crate::GameProfileProperty;
1+
use std::collections::HashMap;
2+
163
use anyhow::bail;
174
use bytes::{Buf, Bytes, BytesMut};
185
use fstr::FString;
19-
use std::collections::HashMap;
20-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
21-
use tokio::net::TcpStream;
22-
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
6+
use tokio::{io::{self, AsyncReadExt, AsyncWriteExt}, net::{TcpStream, tcp::{OwnedReadHalf, OwnedWriteHalf}}, sync::mpsc::UnboundedSender, task::JoinHandle};
237
use uuid::Uuid;
248

9+
use crate::{ClientId, GameProfile, GameProfileProperty, network::{binary::var_int::{VarInt, peek_var_int}, connection_state::ConnectionState, internal_packets::{MainThreadMessage, NetworkThreadMessage}, packets::{packet_buffer::PacketBuffer, packet_deserialize::PacketDeserializable}, protocol::{handshake::serverbound::Handshake, login::{clientbound::LoginSuccess, serverbound::LoginStart}, play::serverbound::Play, status::{clientbound::{StatusPong, StatusResponse}, serverbound::StatusPing}}}, types::status::StatusBytes};
10+
2511
#[derive(Debug, Clone)]
2612
pub struct Client {
2713
pub id: ClientId,
2814
pub connection_state: ConnectionState,
2915
// pub game_profile: Option<GameProfile>,
3016
}
3117

18+
pub struct ClientHandler {
19+
writer: OwnedWriteHalf,
20+
handle: JoinHandle<()>
21+
}
3222

33-
// main thread tx errors can be ignored since the network/client threads will close eachother properly regardless of client status,
34-
// which will in turn close the client handlers.
23+
impl ClientHandler {
24+
pub fn spawn(
25+
client_id: ClientId,
26+
socket: TcpStream,
27+
network_tx: UnboundedSender<NetworkThreadMessage>,
28+
main_tx: UnboundedSender<MainThreadMessage>,
29+
status: StatusBytes
30+
) -> Self {
31+
let (read, writer) = socket.into_split();
32+
let handle = tokio::spawn(run_client(client_id, read, network_tx, main_tx, status));
33+
Self { writer, handle }
34+
}
35+
36+
pub async fn send(&mut self, data: &Bytes) -> Result<(), io::Error> {
37+
self.writer.write_all(data).await
38+
}
39+
40+
// this does not inform the network thread about the read tasks shutdown.
41+
pub async fn disconnect(&mut self) -> Result<(), io::Error> {
42+
self.writer.shutdown().await?;
43+
self.handle.abort();
44+
Ok(())
45+
}
46+
47+
pub fn abort(&self) {
48+
self.handle.abort();
49+
}
50+
}
3551

36-
pub async fn handle_client(
37-
client_id: ClientId,
38-
mut socket: TcpStream,
39-
tx: UnboundedSender<ClientHandlerMessage>,
40-
mut rx: UnboundedReceiver<ClientHandlerMessage>,
52+
async fn run_client(
53+
client_id: ClientId,
54+
mut reader: OwnedReadHalf,
55+
network_tx: UnboundedSender<NetworkThreadMessage>,
4156
main_tx: UnboundedSender<MainThreadMessage>,
42-
network_tx: UnboundedSender<NetworkThreadMessage>,
4357
status: StatusBytes
4458
) {
4559
let mut client = Client {
4660
id: client_id,
47-
connection_state: Handshaking,
61+
connection_state: ConnectionState::Handshaking,
4862
// game_profile: None,
4963
};
5064
let mut bytes = BytesMut::new();
51-
65+
5266
loop {
53-
tokio::select! {
54-
result = socket.read_buf(&mut bytes) => {
55-
match result {
56-
Ok(0) => break, // channel closed normally
57-
Ok(_) => {
58-
// channel closes, if client sends invalid packets.
59-
if let Err(err) = read_packets(
60-
&mut bytes,
61-
&mut client,
62-
&tx,
63-
&main_tx,
64-
&status
65-
).await {
66-
eprintln!("client {client_id:?} errored: {err}");
67-
break;
68-
}
69-
},
70-
Err(e) => {
71-
eprintln!("Client {client_id:?} read error: {e}");
72-
break;
73-
}
67+
let res: Result<usize, io::Error> = reader.read_buf(&mut bytes).await;
68+
match res {
69+
Ok(0) => break,
70+
Ok(_) => {
71+
if let Err(err) = read_packets(
72+
&mut bytes,
73+
&mut client,
74+
&network_tx,
75+
&main_tx,
76+
&status
77+
).await {
78+
eprintln!("client {client_id:?} errored: {err}");
79+
break;
7480
}
7581
}
76-
77-
Some(message) = rx.recv() => {
78-
match message {
79-
ClientHandlerMessage::Send(data) => {
80-
if let Err(e) = socket.write_all(&data).await {
81-
eprintln!("write error: {e}");
82-
break
83-
}
84-
}
85-
ClientHandlerMessage::CloseHandler => break,
86-
}
82+
Err(e) => {
83+
eprintln!("Client {client_id:?} read error: {e}");
84+
break;
8785
}
8886
}
8987
}
90-
91-
if client.connection_state == ConnectionState::Play {
92-
let _ = network_tx.send(NetworkThreadMessage::ConnectionClosed { client_id });
93-
}
94-
println!("handle client for {client_id:?} closed.");
88+
println!("client reader closed");
89+
let _ = network_tx.send(NetworkThreadMessage::ConnectionClosed { client_id, connection_state: client.connection_state });
9590
}
9691

9792
async fn read_packets(
98-
buffer: &mut BytesMut,
99-
client: &mut Client,
100-
client_tx: &UnboundedSender<ClientHandlerMessage>,
93+
buffer: &mut BytesMut,
94+
client: &mut Client,
95+
network_tx: &UnboundedSender<NetworkThreadMessage>,
10196
main_tx: &UnboundedSender<MainThreadMessage>,
10297
status: &StatusBytes
10398
) -> anyhow::Result<()> {
10499
while let Some(mut buffer) = try_read_packet_slice(buffer) {
105100
match client.connection_state {
106-
Handshaking => handle_handshake(&mut buffer, client)?,
107-
Status => handle_status(&mut buffer, client_tx, status)?,
108-
Login => handle_login(&mut buffer, client, client_tx, main_tx)?,
109-
Play => {
101+
ConnectionState::Handshaking => handle_handshake(&mut buffer, client)?,
102+
ConnectionState::Status => handle_status(client.id, &mut buffer, network_tx, status)?,
103+
ConnectionState::Login => handle_login(&mut buffer, client, network_tx, main_tx)?,
104+
ConnectionState::Play => {
110105
let packet = Play::read(&mut buffer)?;
111106
if let Play::Invalid(packet_id) = packet {
112107
eprintln!("invalid packet: 0x{packet_id:02x}");
@@ -136,29 +131,20 @@ fn try_read_packet_slice(buf: &mut impl Buf) -> Option<Bytes> {
136131
Some(buf.copy_to_bytes(packet_len))
137132
}
138133

139-
pub enum HandleResult {
140-
Ok,
141-
Disconnect,
142-
}
143-
144134
fn handle_handshake(buffer: &mut impl Buf, client: &mut Client) -> anyhow::Result<()> {
145-
if let Ok(packet_id) = VarInt::read(buffer) {
146-
match *packet_id {
147-
0x00 => {
148-
let handshake = Handshake::read(buffer)?;
149-
client.connection_state = ConnectionState::from_id(handshake.next_state.0)?;
150-
Ok(())
151-
},
152-
_ => bail!("Unknown packet id during handshake")
135+
match *VarInt::read(buffer)? {
136+
0x00 => {
137+
let handshake = Handshake::read(buffer)?;
138+
client.connection_state = ConnectionState::from_id(handshake.next_state.0)?;
153139
}
154-
} else {
155-
bail!("Failed to read var_int packet id.")
140+
_ => bail!("Unknown pack id during handshake."),
156141
}
142+
Ok(())
157143
}
158-
159144
fn handle_status(
145+
client_id: ClientId,
160146
buffer: &mut impl Buf,
161-
client_tx: &UnboundedSender<ClientHandlerMessage>,
147+
network_tx: &UnboundedSender<NetworkThreadMessage>,
162148
status: &StatusBytes
163149
) -> anyhow::Result<()> {
164150
let packet_id = *VarInt::read(buffer)?;
@@ -177,20 +163,19 @@ fn handle_status(
177163
}
178164
_ => bail!("Unknown packet id during status")
179165
}
180-
// instead of sending to network tx,
181-
// could just write directly to socket?
182-
client_tx.send(ClientHandlerMessage::Send(packet_buffer.split_into_bytes()))?;
166+
167+
// we cannot directly send to socket since the read task cant own the write half, at least not without Arc<Mutex<>> i think
168+
network_tx.send(NetworkThreadMessage::SendPackets { client_id, buffer: packet_buffer.split_into_bytes() })?;
183169
Ok(())
184170
}
185171

186172
const FLAME_OF_WAR: &str = "ewogICJ0aW1lc3RhbXAiIDogMTc1OTQzODI1MzM2OCwKICAicHJvZmlsZUlkIiA6ICI4YTdhZDkyMzc3MjI0ZjIyOGMwNDI4Y2I1YmQ5NzJkYSIsCiAgInByb2ZpbGVOYW1lIiA6ICJGbGFtZU9mV2FyIiwKICAic2lnbmF0dXJlUmVxdWlyZWQiIDogdHJ1ZSwKICAidGV4dHVyZXMiIDogewogICAgIlNLSU4iIDogewogICAgICAidXJsIiA6ICJodHRwOi8vdGV4dHVyZXMubWluZWNyYWZ0Lm5ldC90ZXh0dXJlL2JiNDg4Njc1YjMxYTQyZTc5MDI0ZGUzOGY1YmQ3ODZhMzlmNzVhMmE2ZGJhMDk0NDc5MmQ0NDNjNjA1ZDE4ZjkiCiAgICB9CiAgfQp9";
187173
const FLAME_OF_WAR_SIG: &str = "UvRQflcS0w4KTJSN+fpqYxVBTwo6wb66JMp6seThrmSGwUmbPfs8WEK2TPBIcipG0kBjWWdDMUpXFZ5YMBshnb7kHh588oPeL0gja/m9yHGEgtfucyqudL3m4sq3iZnJbdO3yKnF/00WqelBI5fZ3zc9SDyAjLUL4QHIXPm4U/z3UH1ZnVjGc5bZbV7qXILw7pF00al8ks1kpOUeds8zjSpVMRMTF9WQww89jNjbpvzcKP97KOOBXPJB1cuTUi3DEe3/9omZhcfgDyZDDJkmF3hTVZx1ijKtknlKRJqFcUEmsL1XUgRxqLSYNt1D1XCjEJeWAyT5YDVtvuj3Oa/zEeWQa9WVSXaUTGpVpQBRJrTJmtLH4O4hDMz4j7M2T0lsbOg7sIqvWVRvmKptKlLWKSWk8tlYXrx+Ef4YN5iva8/xhnKZmfe/JmT8uIKtNiv8Zcrj1WXasJ4wz0JCEQBOJDJXnEU548Sk1nxAcmX/W8jHkMnXArE3LKkLdxD7e++Hw60pv3GcyvTou5Mlrmgo6rHk188Li4CU826i+z0OuodRtdY+vsQIoFWLnnHu4HdqKA3IevcV7+Gl3FDzbzPXiSbUmSAV4drpLELTTPMnhhvMK85zS8138LTuScBiFRKVaSuXZJS7UIJ6VtjYK+iEuVblN9BJihP2NiuubCeL484=";
188174

189-
190175
fn handle_login(
191176
buffer: &mut impl Buf,
192177
client: &mut Client,
193-
client_tx: &UnboundedSender<ClientHandlerMessage>,
178+
network_tx: &UnboundedSender<NetworkThreadMessage>,
194179
main_tx: &UnboundedSender<MainThreadMessage>,
195180
) -> anyhow::Result<()> {
196181
let packet_id = *VarInt::read(buffer)?;
@@ -222,7 +207,7 @@ fn handle_login(
222207
uuid: uuid.hyphenated().to_string(),
223208
name: game_profile.username.clone(),
224209
});
225-
client_tx.send(ClientHandlerMessage::Send(packet_buffer.split_into_bytes()))?;
210+
network_tx.send(NetworkThreadMessage::SendPackets { client_id: client.id, buffer: packet_buffer.split_into_bytes() })?;
226211

227212
main_tx.send(MainThreadMessage::NewPlayer {
228213
client_id: client.id,

server/src/network/internal_packets.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::network::connection_state::ConnectionState;
12
use crate::network::protocol::play::serverbound::Play;
23
use crate::player::player::{ClientId, GameProfile};
34
use crate::types::status::StatusUpdate;
@@ -12,26 +13,21 @@ pub enum NetworkThreadMessage {
1213
},
1314

1415
/// received when the client's handler is closed.
15-
/// sends a client disconnected message to the main thread
16+
/// sends a client disconnected message to the main thread if the connect state is play
1617
ConnectionClosed {
1718
client_id: ClientId,
19+
connection_state: ConnectionState,
1820
},
1921

2022
/// Disconnects the client from the server.
21-
/// This sends a close handler message to the client's handler.
23+
/// This aborts the client's read task and drops the write half.
2224
/// It should be sent after the vanilla disconnect packet is sent.
2325
/// the main thread should wait for a ClientDisconnected response to handle actually removing the player.
2426
DisconnectClient {
2527
client_id: ClientId,
2628
},
2729
}
2830

29-
pub enum ClientHandlerMessage {
30-
Send(Bytes),
31-
/// Closes the handler for this client. This then sends a connection closed message to the network thread.
32-
CloseHandler,
33-
}
34-
3531
pub enum MainThreadMessage {
3632
PacketReceived {
3733
client_id: ClientId,

server/src/network/network.rs

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
1-
use crate::network::client::handle_client;
1+
use crate::network::client::{ClientHandler};
2+
use crate::network::connection_state::ConnectionState;
23
use crate::types::status::Status;
34
use core::panic;
45
use std::collections::HashMap;
56
use tokio::net::TcpListener;
67
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
78

8-
use crate::network::internal_packets::{ClientHandlerMessage, MainThreadMessage, NetworkThreadMessage};
9+
use crate::network::internal_packets::{MainThreadMessage, NetworkThreadMessage};
910
use crate::player::player::ClientId;
1011

1112
type Sender<T> = UnboundedSender<T>;
1213
type Receiver<T> = UnboundedReceiver<T>;
1314

14-
type ClientMap = HashMap<ClientId, UnboundedSender<ClientHandlerMessage>>;
15+
type ClientMap = HashMap<ClientId, ClientHandler>;
1516

1617
pub fn start_network(
1718
ip: &'static str,
@@ -45,20 +46,9 @@ async fn run_network_thread(
4546
let Ok((socket, _)) = result else { continue };
4647
let client_id: ClientId = client_id_counter;
4748
client_id_counter += 1;
48-
49-
// do we need tx and rx for each client?
50-
//
51-
let (client_tx, client_rx) = unbounded_channel::<ClientHandlerMessage>();
52-
clients.insert(client_id, client_tx.clone());
53-
tokio::spawn(handle_client(
54-
client_id,
55-
socket,
56-
client_tx,
57-
client_rx,
58-
main_tx.clone(),
59-
network_tx.clone(),
60-
status.get()
61-
));
49+
50+
let handler = ClientHandler::spawn(client_id, socket, network_tx.clone(), main_tx.clone(), status.get());
51+
clients.insert(client_id, handler);
6252
}
6353

6454
// this can never be none since this function owns a network_tx.
@@ -68,24 +58,30 @@ async fn run_network_thread(
6858
match msg {
6959
NetworkThreadMessage::UpdateStatus(update) => status.set(update),
7060
NetworkThreadMessage::SendPackets { client_id, buffer } => {
71-
if let Some(client_tx) = clients.get(&client_id) {
72-
if let Err(e) = client_tx.send(ClientHandlerMessage::Send(buffer)) {
73-
eprintln!("Client {} handler dropped its reciever: {}", client_id, e);
61+
if let Some(handler) = clients.get_mut(&client_id) {
62+
if let Err(e) = handler.send(&buffer).await {
63+
eprintln!("Client {} handler failed to send: {}", client_id, e);
7464
disconnect_client(client_id, &main_tx, &mut clients);
7565
}
7666
}
7767
}
7868

7969
NetworkThreadMessage::DisconnectClient { client_id } => {
80-
if let Some(client_tx) = clients.get(&client_id) {
81-
if let Err(e) = client_tx.send(ClientHandlerMessage::CloseHandler) {
82-
eprintln!("Client {} handler dropped its reciever: {}", client_id, e);
83-
disconnect_client(client_id, &main_tx, &mut clients);
70+
if let Some(handler) = clients.get_mut(&client_id) {
71+
if let Err(e) = handler.disconnect().await {
72+
eprintln!("Client {} writer failed to shutdown: {}", client_id, e);
8473
}
74+
disconnect_client(client_id, &main_tx, &mut clients);
8575
}
8676
}
8777

88-
NetworkThreadMessage::ConnectionClosed { client_id } => disconnect_client(client_id, &main_tx, &mut clients),
78+
NetworkThreadMessage::ConnectionClosed { client_id, connection_state } => {
79+
if connection_state == ConnectionState::Play {
80+
// we probably shouldnt tell the main thread a client it never added got disconnected?
81+
main_tx.send(MainThreadMessage::ClientDisconnected { client_id }).expect("Main thread should never drop its network reciever.");
82+
}
83+
clients.remove(&client_id);
84+
}
8985
}
9086
}
9187
}

0 commit comments

Comments
 (0)