Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions chatty-tcp/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chatty_tcp::listen::room::serve;
use chatty_tcp::listen::state::RoomState;
use chatty_types::config::{setup_tracing, Component::Server};
use chatty_types::response::ChatResponse;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
Expand All @@ -27,17 +27,12 @@ pub async fn main() -> Result<()> {
span.in_scope(|| info!("listening on {}", listening_on));

// Set up room state for use
let user_set = Mutex::new(HashSet::new());
// bounded channel
let (tx, _rx) = broadcast::channel::<ChatResponse>(100);
// task handles
let task_handles = Mutex::new(HashMap::new());

let room_state = Arc::new(RoomState {
user_set,
tx,
task_handles,
});
let room_state = Arc::new(RoomState { tx, task_handles });

let mut connection_handles = Vec::new();

Expand Down
48 changes: 26 additions & 22 deletions chatty-tcp/src/listen/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,10 @@ pub async fn process_command(
let command: ChatCommand = serde_json::from_str(&line)?;
match command {
ChatCommand::Join(username) => {
let mut users = room_state.user_set.lock().await;
let user_already_exist = users.contains(&username);
let user_already_exist =
room_state.task_handles.lock().await.contains_key(&username);

let chat_response = if !user_already_exist {
users.insert(username.clone());
info!("Users in room after addition: {:?}", users);
info!("Client {} joined as {}", addr, username);
let rx = room_state.tx.subscribe();
let send_task_handle = tokio::spawn(send_from_broadcast_channel_task(
writer.clone(),
Expand All @@ -58,6 +55,11 @@ pub async fn process_command(
.lock()
.await
.insert(username.clone(), send_task_handle);
info!(
"Users in room after addition: {:?}",
room_state.task_handles.lock().await.keys()
);
info!("Client {} joined as {}", addr, username);
send_to_broadcast_channel(
ChatResponse::Broadcast(ChatMemo {
username: username.clone(),
Expand Down Expand Up @@ -96,10 +98,7 @@ pub async fn process_command(
ChatCommand::Leave(username) => {
remove_username(username.clone(), room_state.clone()).await;
debug!("User {} has left", username);
if let Some(handle) = room_state.task_handles.lock().await.remove(&username) {
info!("Aborting background task for user: {}", username);
handle.abort();
}

debug!("User {} has left so sending broadcast message", username);
send_to_broadcast_channel(
ChatResponse::Broadcast(ChatMemo {
Expand All @@ -117,40 +116,45 @@ pub async fn process_command(
}

pub async fn remove_username(username: String, room_state: Arc<RoomState>) {
let mut users = room_state.user_set.lock().await;
users.remove(&username);
let mut lookup = room_state.task_handles.lock().await;
if let Some(handle) = lookup.remove(&username) {
info!("Aborting background task for user: {}", username);
handle.abort();
}
info!("User {} removed from room", username);
// list connected users
let users: Vec<String> = users.iter().cloned().collect();
let users: Vec<String> = lookup.keys().cloned().collect();
info!("Users in room after removal: {:?}", users);
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;

#[tokio::test]
async fn test_remove_username() {
let mut user_set = HashSet::new();
user_set.insert("test_user".to_string());
user_set.insert("other_user".to_string());
let mut lookup_initial = HashMap::new();
let dummy_task: JoinHandle<Result<(), RoomError>> = tokio::spawn(async { Ok(()) });
lookup_initial.insert("test_user".to_string(), dummy_task);
let dummy_task2: JoinHandle<Result<(), RoomError>> = tokio::spawn(async { Ok(()) });
lookup_initial.insert("other_user".to_string(), dummy_task2);

let (tx, _) = broadcast::channel(100);
let room_state = Arc::new(RoomState {
user_set: Mutex::new(user_set),
tx,
task_handles: Mutex::new(HashMap::new()),
task_handles: Mutex::new(lookup_initial),
});

// Execute removal
remove_username("test_user".to_string(), room_state.clone()).await;

// Verify user was removed
let users = room_state.user_set.lock().await;
assert!(!users.contains("test_user"));
assert!(users.contains("other_user"));
assert_eq!(users.len(), 1);
let lookup = room_state.task_handles.lock().await;
assert!(!lookup.contains_key("test_user"));
assert!(lookup.contains_key("other_user"));
assert_eq!(lookup.len(), 1);
}
}
3 changes: 1 addition & 2 deletions chatty-tcp/src/listen/state.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::listen::command::RoomError;
use chatty_types::response::ChatResponse;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use tokio::sync::broadcast;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

type TaskHandleMap = Mutex<HashMap<String, JoinHandle<Result<(), RoomError>>>>;

pub struct RoomState {
pub user_set: Mutex<HashSet<String>>,
pub tx: broadcast::Sender<ChatResponse>,
pub task_handles: TaskHandleMap,
}
26 changes: 8 additions & 18 deletions chatty-tcp/tests/client_server_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chatty_tcp::listen::state::RoomState;
use chatty_types::config::setup_tracing;
use chatty_types::config::Component::Server;
use chatty_types::response::ChatResponse;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
Expand All @@ -25,15 +25,10 @@ fn init_tracing_for_tests() {
async fn single_client() {
init_tracing_for_tests();
// Set up room state
let user_set = Mutex::new(HashSet::new());
let (tx, _rx) = broadcast::channel::<ChatResponse>(100);
let task_handles = Mutex::new(HashMap::new());

let room_state = Arc::new(RoomState {
user_set,
tx,
task_handles,
});
let room_state = Arc::new(RoomState { tx, task_handles });

// Start the server in a background task
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
Expand Down Expand Up @@ -83,8 +78,8 @@ async fn single_client() {

// // Verify user is there
let room_state_for_removal = room_state.clone();
let users = room_state_for_removal.user_set.lock().await;
assert!(users.contains("alone"));
let lookup = room_state_for_removal.task_handles.lock().await;
assert!(lookup.contains_key("alone"));

// leave command
let leave_command = r#"{"Leave":"alone"}"#;
Expand All @@ -104,12 +99,7 @@ async fn multiple_clients() {
let task_handles = Mutex::new(HashMap::new());

// Set up room state
let user_set = Mutex::new(HashSet::new());
let room_state = Arc::new(RoomState {
user_set,
tx,
task_handles,
});
let room_state = Arc::new(RoomState { tx, task_handles });
let state = room_state.clone();

// Start the server in a background task
Expand Down Expand Up @@ -194,9 +184,9 @@ async fn multiple_clients() {
let expected_message2 = r#"{"Broadcast":{"username":"carl","content":"Left"}}"#;
assert_eq!(broadcast_message, expected_message2);

let user_set = state.user_set.lock().await;
assert_eq!(user_set.len(), 1);
assert!(user_set.contains("david"));
let lookup = state.task_handles.lock().await;
assert_eq!(lookup.len(), 1);
assert!(lookup.contains_key("david"));

// Clean up
server_handle.abort();
Expand Down
Binary file modified demo/Weather-Standup.mp4
Binary file not shown.
Loading