diff --git a/chatty-tcp/src/bin/server.rs b/chatty-tcp/src/bin/server.rs index ef07ebe..097aab0 100644 --- a/chatty-tcp/src/bin/server.rs +++ b/chatty-tcp/src/bin/server.rs @@ -6,7 +6,7 @@ use chatty_tcp::listen::room::serve; use chatty_tcp::listen::state::RoomState; use chatty_types::config::{setup_tracing, Component::Server}; use chatty_types::response::ChatResponse; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use tokio::net::TcpListener; use tokio::sync::broadcast; @@ -27,17 +27,12 @@ pub async fn main() -> Result<()> { span.in_scope(|| info!("listening on {}", listening_on)); // Set up room state for use - let user_set = Mutex::new(HashSet::new()); // bounded channel let (tx, _rx) = broadcast::channel::(100); // task handles let task_handles = Mutex::new(HashMap::new()); - let room_state = Arc::new(RoomState { - user_set, - tx, - task_handles, - }); + let room_state = Arc::new(RoomState { tx, task_handles }); let mut connection_handles = Vec::new(); diff --git a/chatty-tcp/src/listen/command.rs b/chatty-tcp/src/listen/command.rs index 894b0cd..bf16d39 100644 --- a/chatty-tcp/src/listen/command.rs +++ b/chatty-tcp/src/listen/command.rs @@ -40,13 +40,10 @@ pub async fn process_command( let command: ChatCommand = serde_json::from_str(&line)?; match command { ChatCommand::Join(username) => { - let mut users = room_state.user_set.lock().await; - let user_already_exist = users.contains(&username); + let user_already_exist = + room_state.task_handles.lock().await.contains_key(&username); let chat_response = if !user_already_exist { - users.insert(username.clone()); - info!("Users in room after addition: {:?}", users); - info!("Client {} joined as {}", addr, username); let rx = room_state.tx.subscribe(); let send_task_handle = tokio::spawn(send_from_broadcast_channel_task( writer.clone(), @@ -58,6 +55,11 @@ pub async fn process_command( .lock() .await .insert(username.clone(), send_task_handle); + info!( + "Users in room after addition: {:?}", + room_state.task_handles.lock().await.keys() + ); + info!("Client {} joined as {}", addr, username); send_to_broadcast_channel( ChatResponse::Broadcast(ChatMemo { username: username.clone(), @@ -96,10 +98,7 @@ pub async fn process_command( ChatCommand::Leave(username) => { remove_username(username.clone(), room_state.clone()).await; debug!("User {} has left", username); - if let Some(handle) = room_state.task_handles.lock().await.remove(&username) { - info!("Aborting background task for user: {}", username); - handle.abort(); - } + debug!("User {} has left so sending broadcast message", username); send_to_broadcast_channel( ChatResponse::Broadcast(ChatMemo { @@ -117,40 +116,45 @@ pub async fn process_command( } pub async fn remove_username(username: String, room_state: Arc) { - let mut users = room_state.user_set.lock().await; - users.remove(&username); + let mut lookup = room_state.task_handles.lock().await; + if let Some(handle) = lookup.remove(&username) { + info!("Aborting background task for user: {}", username); + handle.abort(); + } info!("User {} removed from room", username); // list connected users - let users: Vec = users.iter().cloned().collect(); + let users: Vec = lookup.keys().cloned().collect(); info!("Users in room after removal: {:?}", users); } #[cfg(test)] mod tests { use super::*; - use std::collections::{HashMap, HashSet}; + use std::collections::HashMap; use tokio::sync::broadcast; + use tokio::task::JoinHandle; #[tokio::test] async fn test_remove_username() { - let mut user_set = HashSet::new(); - user_set.insert("test_user".to_string()); - user_set.insert("other_user".to_string()); + let mut lookup_initial = HashMap::new(); + let dummy_task: JoinHandle> = tokio::spawn(async { Ok(()) }); + lookup_initial.insert("test_user".to_string(), dummy_task); + let dummy_task2: JoinHandle> = tokio::spawn(async { Ok(()) }); + lookup_initial.insert("other_user".to_string(), dummy_task2); let (tx, _) = broadcast::channel(100); let room_state = Arc::new(RoomState { - user_set: Mutex::new(user_set), tx, - task_handles: Mutex::new(HashMap::new()), + task_handles: Mutex::new(lookup_initial), }); // Execute removal remove_username("test_user".to_string(), room_state.clone()).await; // Verify user was removed - let users = room_state.user_set.lock().await; - assert!(!users.contains("test_user")); - assert!(users.contains("other_user")); - assert_eq!(users.len(), 1); + let lookup = room_state.task_handles.lock().await; + assert!(!lookup.contains_key("test_user")); + assert!(lookup.contains_key("other_user")); + assert_eq!(lookup.len(), 1); } } diff --git a/chatty-tcp/src/listen/state.rs b/chatty-tcp/src/listen/state.rs index 879574f..fc23af1 100644 --- a/chatty-tcp/src/listen/state.rs +++ b/chatty-tcp/src/listen/state.rs @@ -1,6 +1,6 @@ use crate::listen::command::RoomError; use chatty_types::response::ChatResponse; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use tokio::sync::broadcast; use tokio::sync::Mutex; use tokio::task::JoinHandle; @@ -8,7 +8,6 @@ use tokio::task::JoinHandle; type TaskHandleMap = Mutex>>>; pub struct RoomState { - pub user_set: Mutex>, pub tx: broadcast::Sender, pub task_handles: TaskHandleMap, } diff --git a/chatty-tcp/tests/client_server_integration.rs b/chatty-tcp/tests/client_server_integration.rs index ee216c4..fd298d6 100644 --- a/chatty-tcp/tests/client_server_integration.rs +++ b/chatty-tcp/tests/client_server_integration.rs @@ -4,7 +4,7 @@ use chatty_tcp::listen::state::RoomState; use chatty_types::config::setup_tracing; use chatty_types::config::Component::Server; use chatty_types::response::ChatResponse; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpStream; @@ -25,15 +25,10 @@ fn init_tracing_for_tests() { async fn single_client() { init_tracing_for_tests(); // Set up room state - let user_set = Mutex::new(HashSet::new()); let (tx, _rx) = broadcast::channel::(100); let task_handles = Mutex::new(HashMap::new()); - let room_state = Arc::new(RoomState { - user_set, - tx, - task_handles, - }); + let room_state = Arc::new(RoomState { tx, task_handles }); // Start the server in a background task let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); @@ -83,8 +78,8 @@ async fn single_client() { // // Verify user is there let room_state_for_removal = room_state.clone(); - let users = room_state_for_removal.user_set.lock().await; - assert!(users.contains("alone")); + let lookup = room_state_for_removal.task_handles.lock().await; + assert!(lookup.contains_key("alone")); // leave command let leave_command = r#"{"Leave":"alone"}"#; @@ -104,12 +99,7 @@ async fn multiple_clients() { let task_handles = Mutex::new(HashMap::new()); // Set up room state - let user_set = Mutex::new(HashSet::new()); - let room_state = Arc::new(RoomState { - user_set, - tx, - task_handles, - }); + let room_state = Arc::new(RoomState { tx, task_handles }); let state = room_state.clone(); // Start the server in a background task @@ -194,9 +184,9 @@ async fn multiple_clients() { let expected_message2 = r#"{"Broadcast":{"username":"carl","content":"Left"}}"#; assert_eq!(broadcast_message, expected_message2); - let user_set = state.user_set.lock().await; - assert_eq!(user_set.len(), 1); - assert!(user_set.contains("david")); + let lookup = state.task_handles.lock().await; + assert_eq!(lookup.len(), 1); + assert!(lookup.contains_key("david")); // Clean up server_handle.abort(); diff --git a/demo/Weather-Standup.mp4 b/demo/Weather-Standup.mp4 index 20aa184..c664676 100644 Binary files a/demo/Weather-Standup.mp4 and b/demo/Weather-Standup.mp4 differ