Skip to content

Commit 698a375

Browse files
committed
chore: Refactor and modularize server code
Signed-off-by: rrouviere <[email protected]>
1 parent 5984ce6 commit 698a375

File tree

5 files changed

+138
-110
lines changed

5 files changed

+138
-110
lines changed

topics/p2p-transfer-protocol/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ fn server_mode(bind_addr: &str, file_path: &str) -> io::Result<()> {
8787
);
8888

8989
// Wait for client
90+
// FIXME: We handle multiple clients, but since the output file is shared among them, this is unhelpful.
9091
for stream in listener.incoming() {
9192
match stream {
9293
Ok(mut s) => {

topics/p2p-transfer-protocol/src/protocol/message.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ impl FromStr for Message {
3232
if upper.starts_with("HELLO") {
3333
let parts: Vec<&str> = s.split_whitespace().collect();
3434
if parts.len() == 2
35-
&& let Ok(file_size) = parts[1].parse::<u64>() {
36-
return Ok(Message::Hello { file_size });
37-
}
35+
&& let Ok(file_size) = parts[1].parse::<u64>()
36+
{
37+
return Ok(Message::Hello { file_size });
38+
}
3839
error!("Invalid HELLO message");
3940
return Err(());
4041
};
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use log::info;
2+
use std::fs::File;
3+
use std::io::{self, Write, copy};
4+
use std::net::TcpStream;
5+
6+
use crate::protocol::StateMachine;
7+
use crate::protocol::connection_state::ConnectionState;
8+
use crate::protocol::message::Message;
9+
10+
use crate::server::events::handle_message_loop;
11+
12+
pub(crate) fn handle_connection(file: &mut File, stream: &mut TcpStream) -> Result<(), io::Error> {
13+
// Intialize the state machine
14+
let mut state_machine = StateMachine::new();
15+
state_machine.transition(ConnectionState::Listening);
16+
17+
// Start the negociation message loop.
18+
handle_message_loop(&mut state_machine, stream)?;
19+
20+
// Now we're in Established state, receive the file data.
21+
let expected_file_size = state_machine.file_size.unwrap_or(0);
22+
23+
receive_file(file, stream, expected_file_size)
24+
}
25+
26+
pub fn send_message(stream: &mut TcpStream, msg: &Message) -> Result<(), io::Error> {
27+
let msg_str = msg.to_string();
28+
stream.write_all(msg_str.as_bytes())?;
29+
Ok(())
30+
}
31+
32+
pub fn receive_file(
33+
file: &mut File,
34+
stream: &mut TcpStream,
35+
expected_file_size: u64,
36+
) -> io::Result<()> {
37+
let mut sized_stream = io::Read::take(stream, expected_file_size);
38+
39+
let bytes_copied = copy(&mut sized_stream, file)?;
40+
file.flush()?;
41+
42+
if bytes_copied != expected_file_size {
43+
return Err(io::Error::new(
44+
io::ErrorKind::InvalidData,
45+
format!(
46+
"Expected {} bytes, but received {} bytes",
47+
expected_file_size, bytes_copied
48+
),
49+
));
50+
}
51+
52+
info!(
53+
"File received successfully, {} bytes written.",
54+
bytes_copied
55+
);
56+
Ok(())
57+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
use log::{debug, error, info, warn};
2+
use std::io::{self, BufRead, BufReader};
3+
use std::net::TcpStream;
4+
5+
use crate::protocol::StateMachine;
6+
use crate::protocol::connection_state::ConnectionState;
7+
use crate::protocol::message::Message;
8+
use crate::server::connection::send_message;
9+
10+
fn on_hello(file_size: u64, state_machine: &mut StateMachine) -> Message {
11+
info!("Client wants to send a file of size {} bytes", file_size);
12+
state_machine.transition(ConnectionState::HelloReceived);
13+
14+
state_machine.file_size = Some(file_size);
15+
16+
// TODO: Add a basic chec k to handle the NACK case.
17+
18+
// send ack
19+
state_machine.transition(ConnectionState::ACKSent);
20+
21+
Message::Ack
22+
}
23+
24+
fn on_message(msg: Message, state_machine: &mut StateMachine) -> Option<Message> {
25+
match msg {
26+
Message::Hello { file_size } => Some(on_hello(file_size, state_machine)),
27+
Message::Send => {
28+
info!("Client is starting to send data.");
29+
state_machine.transition(ConnectionState::Established);
30+
None
31+
}
32+
33+
_ => {
34+
warn!("Unexpected message from client: {:?}", msg);
35+
None
36+
}
37+
}
38+
}
39+
pub(crate) fn handle_message_loop(
40+
state_machine: &mut StateMachine,
41+
stream: &mut TcpStream,
42+
) -> Result<(), io::Error> {
43+
let reader_stream = stream.try_clone().expect("Failed to clone stream");
44+
let writer_stream = stream;
45+
let reader = BufReader::new(reader_stream);
46+
47+
for line in reader.lines() {
48+
let message = match line {
49+
Ok(line) => {
50+
debug!("Received: {}", line);
51+
line.parse::<Message>()
52+
}
53+
Err(e) => {
54+
error!("Error reading line: {}", e);
55+
continue;
56+
}
57+
};
58+
59+
// Process message
60+
// FIXME: Do not use unwrap, fix the result type mess.
61+
if let Some(resp) = on_message(message.unwrap(), state_machine) {
62+
send_message(writer_stream, &resp)?
63+
}
64+
65+
// If we are in Established state, stop the line-based logic and receive the data.
66+
if let ConnectionState::Established = state_machine.current_state() {
67+
break;
68+
}
69+
}
70+
Ok(())
71+
}
Lines changed: 5 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,117 +1,15 @@
1-
use log::{debug, error, info, warn};
21
use std::fs::File;
3-
use std::io::{self, BufRead, BufReader, Write, copy};
2+
use std::io;
43
use std::net::TcpStream;
54

6-
use crate::protocol::StateMachine;
7-
use crate::protocol::connection_state::ConnectionState;
8-
use crate::protocol::message::Message;
5+
use crate::server::connection::handle_connection;
96

10-
fn on_hello(file_size: u64, state_machine: &mut StateMachine) -> Message {
11-
info!("Client wants to send a file of size {} bytes", file_size);
12-
state_machine.transition(ConnectionState::HelloReceived);
13-
14-
state_machine.file_size = Some(file_size);
15-
16-
// TODO: Add a basic check to handle the NACK case.
17-
18-
// send ack
19-
state_machine.transition(ConnectionState::ACKSent);
20-
21-
Message::Ack
22-
}
23-
24-
fn on_message(msg: Message, state_machine: &mut StateMachine) -> Option<Message> {
25-
match msg {
26-
Message::Hello { file_size } => Some(on_hello(file_size, state_machine)),
27-
Message::Send => {
28-
info!("Client is starting to send data.");
29-
state_machine.transition(ConnectionState::Established);
30-
None
31-
}
32-
33-
_ => {
34-
warn!("Unexpected message from client: {:?}", msg);
35-
None
36-
}
37-
}
38-
}
39-
40-
fn send_message(stream: &mut TcpStream, msg: &Message) -> Result<(), io::Error> {
41-
let msg_str = msg.to_string();
42-
stream.write_all(msg_str.as_bytes())?;
43-
Ok(())
44-
}
7+
mod connection;
8+
mod events;
459

4610
pub fn run_server(file: &mut File, stream: &mut TcpStream) -> Result<(), io::Error> {
4711
let peer_addr = stream.peer_addr()?;
4812
println!("New connection from {peer_addr}");
4913

50-
// Intialize the state machine
51-
let mut state_machine = StateMachine::new();
52-
state_machine.transition(ConnectionState::Listening);
53-
54-
// Start the negociation message loop.
55-
message_loop(&mut state_machine, stream)?;
56-
57-
// Now we're in Established state, receive the file data.
58-
let expected_file_size = state_machine.file_size.unwrap_or(0);
59-
60-
receive_file(file, stream, expected_file_size)
61-
}
62-
63-
fn receive_file(
64-
file: &mut File,
65-
stream: &mut TcpStream,
66-
expected_file_size: u64,
67-
) -> io::Result<()> {
68-
let mut sized_stream = io::Read::take(stream, expected_file_size);
69-
70-
let bytes_copied = copy(&mut sized_stream, file)?;
71-
file.flush()?;
72-
73-
if bytes_copied != expected_file_size {
74-
return Err(io::Error::new(
75-
io::ErrorKind::InvalidData,
76-
format!(
77-
"Expected {} bytes, but received {} bytes",
78-
expected_file_size, bytes_copied
79-
),
80-
));
81-
}
82-
83-
info!(
84-
"File received successfully, {} bytes written.",
85-
bytes_copied
86-
);
87-
Ok(())
88-
}
89-
90-
fn message_loop(state_machine: &mut StateMachine, stream: &mut TcpStream) -> Result<(), io::Error> {
91-
let reader_stream = stream.try_clone().expect("Failed to clone stream");
92-
let writer_stream = stream;
93-
let reader = BufReader::new(reader_stream);
94-
95-
for line in reader.lines() {
96-
let message = match line {
97-
Ok(line) => {
98-
debug!("Received: {}", line);
99-
line.parse::<Message>()
100-
}
101-
Err(e) => {
102-
error!("Error reading line: {}", e);
103-
continue;
104-
}
105-
};
106-
107-
// Process message
108-
// FIXME: Do not use unwrap, fix the result type mess.
109-
if let Some(resp) = on_message(message.unwrap(), state_machine) { send_message(writer_stream, &resp)? }
110-
111-
// If we are in Established state, stop the line-based logic and receive the data.
112-
if let ConnectionState::Established = state_machine.current_state() {
113-
break;
114-
}
115-
}
116-
Ok(())
14+
handle_connection(file, stream)
11715
}

0 commit comments

Comments
 (0)