Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
422850f
Add the bytes crate to the workspace.
Makosai Apr 28, 2025
20eb306
Backup - squash
Makosai Apr 28, 2025
bc740c9
Remove Player enum from the list of packets. Implement client.
Makosai Apr 28, 2025
63052df
Implement the Master Server and improve the client.
Makosai Apr 29, 2025
4e6b4d3
Improve the master server.
Makosai Apr 29, 2025
b3b0cab
Update the imports for the master client.
Makosai Apr 29, 2025
2645200
Improve logging performance.
Makosai Apr 29, 2025
4072564
Implement cluster and improve all other crates.
Makosai Apr 29, 2025
33b8333
Start working on improving performance and design for the Master Server.
Makosai Apr 30, 2025
3cc812a
Remove tests from master.rs
Makosai Apr 30, 2025
aa96498
Finalize master_test.rs
Makosai Apr 30, 2025
d507359
Create a trait to centralize server design.
Makosai Apr 30, 2025
31f3be6
Migrate master to the newest and final structure.
Makosai Apr 30, 2025
098861f
Update main.rs
Makosai Apr 30, 2025
40b28f1
Simplify the master server bin entrypoint.
Makosai Apr 30, 2025
c7b2df1
Improve warning/documentation for master and master_client.
Makosai May 1, 2025
fdc64ab
Delete server.rs
Makosai May 1, 2025
1c8c312
Update master_test.rs
Makosai May 1, 2025
11e7640
Backup all research and tests before cleanup.
Makosai May 4, 2025
e2d69d4
Remove unused tests and create a temporary mio implementation for res…
Makosai May 6, 2025
c941a4d
Remove the Mio implementation now that it's backed up.
Makosai May 6, 2025
8d012ef
Add tests for performance with different hashmap/dashmap implementati…
Makosai May 6, 2025
7acf4d5
Removed useless async syntax.
Makosai May 6, 2025
7c41e0f
Create tests for researching faster ways to implement a listener.
Makosai May 7, 2025
ca7a169
Add more tests for performance against dyn invokes.
Makosai May 7, 2025
f8ad76a
Move tests to their own unpublished crate.
Makosai May 8, 2025
39332ff
Utilize send on the MasterClient instead.
Makosai May 8, 2025
6a4ea4b
Swap to mpsc, handle Init command, and improve / use config.
Makosai May 10, 2025
dae5e97
Fix all bugs and warnings for this migration.
Makosai May 11, 2025
f57cf60
Add num_cpus.
Makosai Jun 7, 2025
4473cd3
Upgrade all crates.
Makosai Jun 7, 2025
f81a8e0
Extend the possible packets.
Makosai Jun 7, 2025
8788a56
Add more documentation to the master file.
Makosai Jun 7, 2025
92e1142
Start using dashmap for connections.
Makosai Jun 7, 2025
4466c52
Remove tick from master and add it to cluster.
Makosai Jun 7, 2025
9a2e21a
Increase the event size.
Makosai Jun 9, 2025
dd370f0
Improve the cleanup for the server.
Makosai Jun 9, 2025
c51dc05
Silence the import warnings.
Makosai Aug 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ unsafe_code = "forbid"
[workspace]
resolver = "2"

members = ["master", "cluster", "client", "shared", "auth", "sustenet"]
members = ["master", "cluster", "client", "shared", "auth", "sustenet", "tests"]
exclude = ["backup"]

[workspace.package]
Expand All @@ -22,13 +22,13 @@ sustenet-master = { path = "master", version = "0.1.4" }
sustenet-shared = { path = "shared", version = "0.1.4" }

aes = "0.8.4"
public-ip = "0.2.2"

aes-gcm = "0.10.3"
base64 = "0.22.1"
config = "0.15.4"
ctrlc = "3.4.5"
bytes = "1.10.1"
config = "0.15.11"
dashmap = "6.1.0"
getrandom = "0.3.2"
getrandom = "0.3.3"
lazy_static = "1.5.0"
tokio = { version = "1.41.1", default-features = false, features = [] }
num_cpus = "1.17.0"
public-ip = "0.2.2"
tokio = { version = "1.45.1", default-features = false, features = [] }
12 changes: 0 additions & 12 deletions rust/Config.toml

This file was deleted.

26 changes: 26 additions & 0 deletions rust/auth/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// This is the auth server. It handles all authentication from everywhere.
// I may also eventually make this distributed as well depending on the load.
// The idea is that the auth server will be a trusted endpoint for all clusters and clients.

// If some random person wants to host a server, how do we handle authentication without trusting them with the password? Simple, we're the middleman. For every server.

// 1. The client tells the cluster, "Yo, i'd like to authenticate."
// 2. The untrusted cluster tells their client, "Yo, your secret id is `5d600d55-2261-4b12-a543-3dc2f6f54a81`."
// 3. The Trusted Auth Server gets a message from the untrusted cluster and says, "Alright, I'll save your UUID for 30 seconds."
// 4. The client sends their UUID with their credentials.
// 5. The auth server says to the cluster server, "Yeah, they're good. Bye."
// 6. The untrusted cluster tells the client, "Alright, bossman said you're good to go. Come in."
// 7. The cluster server also gets a UID they can track every few seconds to if they wanted to. This is good for security like changing passwords and triggering an optional "sign out all clients". This will be done with a WebSocket server that publishes when a specific UID wants to be logged out. This is safe because usernames and emails are never public. Only UID and Display names. So even if someone knew a specific UID was changed recently, they have no way to really target that user. Especially because we'll be enforcing that you can never have your username the same as your display name UNLESS you have 2FA. If you have 2FA then we care a little less. Still not safe.

// Why do clusters from untrusted servers need authentication?
// We want to still be able to get their purchases.
// The cluster will get all of the purchases for the UID from the auth server and then send them to the client.
// The cluster will have a copy of the purchases.

// An idea to improve security so people don't just spam for every possible UID is to have a token the auth server encrypts
// and gives it to the cluster server. The cluster server has to send that token back with the UID to get the purchases.
// It'll also send an expiration time for the token. The token is just the UID and the expiration time.

// Additionally, on a successful auth, the auth will send the client a token that allows them to access all of their data.
// THIS token should never be shared with the server. It's an actual token to their account. The cluster server can tell the
// client what their UID is at this point to save bandwidth on the auth server.
2 changes: 1 addition & 1 deletion rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ homepage.workspace = true
workspace = true

[dependencies]
lazy_static.workspace = true
bytes.workspace = true
sustenet-shared.workspace = true
tokio = { workspace = true, features = [
# "socket2",
Expand Down
276 changes: 276 additions & 0 deletions rust/client/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
//! Handles connections to a server and sending messages.
use sustenet_shared::logging::{ LogType, Logger };
use sustenet_shared::lselect;
use sustenet_shared::packets::{ Connection, Diagnostics, Messaging };

use std::io::Error;
use std::sync::LazyLock;

use bytes::Bytes;
use tokio::io::AsyncReadExt;
use tokio::io::{ self, AsyncWriteExt };
use tokio::net::TcpStream;
use tokio::sync::{ broadcast, mpsc };

/// Global logger for the client module.
pub static LOGGER: LazyLock<Logger> = LazyLock::new(|| Logger::new(LogType::Client));

#[derive(Debug, Clone, PartialEq)]
pub struct ClusterInfo {
pub name: String,
pub ip: String,
pub port: u16,
pub max_connections: u32,
}



/// Events emitted by the client to notify listeners.
///
/// Should be handled with `event_receiver` or `next_event` externally.
#[derive(Debug, Clone)]
pub enum ClientEvent {
Connected,
Disconnected,
CommandSent(u8),
MessageSent(Bytes),
CommandReceived(u8),
MessageReceived(Bytes),
Error(String),
}

/// Handles connection to a master or cluster server, and provides async channels for interaction.
pub struct Client {
/// Sends messages to the server.
sender: mpsc::Sender<Bytes>,
/// Sends events to listeners.
event_tx: broadcast::Sender<ClientEvent>,
/// Receives events about connection state and activity.
event_rx: broadcast::Receiver<ClientEvent>,
/// Cluster servers this client knows about.
pub cluster_servers: Vec<ClusterInfo>,
}

impl Client {
/// Attempts to connect to a server at the specified address and port and returns a `ClientHandle`.
pub async fn connect(address: &str, port: u16) -> io::Result<Self> {
let addr = format!("{}:{}", address, port);
LOGGER.info(&format!("Connecting to {addr}..."));

// Establish a connection to the server.
let mut stream = match TcpStream::connect(&addr).await {
Ok(s) => {
LOGGER.success(&format!("Connected to {addr}"));
s
}
Err(e) => {
LOGGER.error(&format!("Failed to connect to {addr}"));
return Err(Error::new(e.kind(), format!("Failed to connect to ({addr}): {e}")));
}
};

let (sender, mut receiver) = mpsc::channel::<Bytes>(64);
let (event_tx, event_rx) = broadcast::channel::<ClientEvent>(16);

let sender_clone = sender.clone();
let event_tx_clone = event_tx.clone();

tokio::spawn(async move {
let (reader, mut writer) = stream.split();
let mut reader = io::BufReader::new(reader);

lselect!(
// Handle local requests to send a message to the server.
msg = receiver.recv() => {
match msg {
Some(msg) => {
if msg.is_empty() {
LOGGER.warning("Received empty message, shutting down client");
Self::handle_shutdown(writer, event_tx_clone).await;
break;
}

LOGGER.debug(&format!("Sending message: {:?}", msg));
if let Err(e) = writer.write_all(&msg).await {
let msg = format!("Failed to send message to server: {e}");
LOGGER.error(&msg);
let _ = event_tx_clone.send(ClientEvent::Error(msg));
} else {
let _ = event_tx_clone.send(ClientEvent::MessageSent(msg));
}
},
None => {
LOGGER.warning("Connection closed");
Self::handle_shutdown(writer, event_tx_clone).await;
break;
}
}
},
command = reader.read_u8() => {
match command {
Ok(command) => {
LOGGER.debug(&format!("Received command: {command}"));

Self::handle_command(command, &sender_clone, &mut reader, &mut writer, &event_tx_clone).await;

// Notify listeners about the received message.
let _ = event_tx_clone.send(ClientEvent::CommandReceived(command));
},
Err(e) => {
let msg = format!("Failed to read command from server: {e}");
LOGGER.error(&msg);
let _ = event_tx_clone.send(ClientEvent::Error(msg));
}
}
}
)
});

// Notify connected immediately.
let _ = event_tx.send(ClientEvent::Connected);

Ok(Client {
sender,
event_tx,
event_rx,
cluster_servers: Vec::new(),
})
}

async fn handle_shutdown(
mut writer: tokio::net::tcp::WriteHalf<'_>,
event_tx_clone: broadcast::Sender<ClientEvent>
) {
if let Err(e) = writer.shutdown().await {
let msg = format!("Failed to shutdown writer: {e}");
LOGGER.error(&msg);
let _ = event_tx_clone.send(ClientEvent::Error(msg));
}
let _ = event_tx_clone.send(ClientEvent::Disconnected);
}

/// Handles commands received from the server.
/// This function is called in a separate task to handle incoming commands.
async fn handle_command(
command: u8,
_sender: &mpsc::Sender<Bytes>,
_reader: &mut io::BufReader<tokio::net::tcp::ReadHalf<'_>>,
_writer: &mut tokio::net::tcp::WriteHalf<'_>,
event_tx: &broadcast::Sender<ClientEvent>
) {
// Todo: Handle commands.
// Handle the command received from the server.
match command {
x if x == (Connection::Connect as u8) => Self::handle_connect_command().await,
x if x == (Connection::Disconnect as u8) => Self::handle_disconnect_command().await,
x if x == (Connection::Authenticate as u8) => Self::handle_authenticate_command().await,

x if x == (Messaging::SendGlobalMessage as u8) => {
Self::handle_send_global_message_command().await
}
x if x == (Messaging::SendPrivateMessage as u8) => {
Self::handle_send_private_message_command().await
}
x if x == (Messaging::SendPartyMessage as u8) => {
Self::handle_send_party_message_command().await
}
x if x == (Messaging::SendLocalMessage as u8) => {
Self::handle_send_local_message_command().await
}

x if x == (Diagnostics::CheckServerType as u8) => {
Self::handle_check_server_type_command().await
}
x if x == (Diagnostics::CheckServerUptime as u8) => {
Self::handle_check_server_uptime_command().await
}
x if x == (Diagnostics::CheckServerPlayerCount as u8) => {
Self::handle_check_server_player_count_command().await
}

_ => Self::handle_extra_command(command, event_tx).await,
}
}

async fn handle_connect_command() {
todo!();
}
async fn handle_disconnect_command() {
todo!();
}
async fn handle_authenticate_command() {
todo!();
}

async fn handle_send_global_message_command() {
todo!();
}
async fn handle_send_private_message_command() {
todo!();
}
async fn handle_send_party_message_command() {
todo!();
}
async fn handle_send_local_message_command() {
todo!();
}

async fn handle_check_server_type_command() {
todo!();
}
async fn handle_check_server_uptime_command() {
todo!();
}
async fn handle_check_server_player_count_command() {
todo!();
}

async fn handle_extra_command(command: u8, event_tx: &broadcast::Sender<ClientEvent>) {
let msg = format!("Unknown command received: {command}");
LOGGER.error(&msg);
let _ = event_tx.send(ClientEvent::Error(msg));
}

/// Sends data to the server.
pub async fn send(&self, msg: Bytes) -> Result<(), mpsc::error::SendError<Bytes>> {
self.sender.send(msg.clone()).await?;
let _ = self.event_tx.send(ClientEvent::MessageSent(msg));
Ok(())
}

/// Returns a cloneable event receiver for status updates.
pub fn event_receiver(&self) -> broadcast::Receiver<ClientEvent> {
self.event_rx.resubscribe()
}

/// Returns the next event from the event receiver.
pub async fn next_event(&mut self) -> Option<ClientEvent> {
let event = self.event_rx.recv().await;
match event {
Ok(event) => Some(event),
Err(_) => None,
}
}

// region: Cluster Server Utilities
pub fn get_cluster_servers(&self) -> &[ClusterInfo] {
&self.cluster_servers
}

pub fn add_cluster_server(&mut self, server: ClusterInfo) {
self.cluster_servers.push(server);
}

pub fn add_cluster_servers(&mut self, servers: Vec<ClusterInfo>) {
self.cluster_servers.extend(servers);
}

pub fn remove_cluster_server(&mut self, server: &ClusterInfo) {
self.cluster_servers.retain(|s| s != server);
}

pub fn clear_cluster_servers(&mut self) {
self.cluster_servers.clear();
}
// endregion: Cluster Server Utilities
}
Loading
Loading