Skip to content

Commit 42dbf55

Browse files
feat(storage): implement storage repository to write file during FerrisShare protocol (dev-sys-do#6)
* feat(storage): add StorageError enum with Display implementation and update StorageRepository trait * feat(storage): implement FSStorageRepository with file handling and sanitization * feat(command): update CommandService trait to include process_binary_data method * feat(network): make command_service public in NetworkServiceImpl * feat(application): implement FerrisShareState struct with command and network services * refactor(network): remove unused HashSet import from entities.rs * feat(main): refactor main function to initialize services with storage repository * feat(storage): add Display implementation for StorageError enum * feat(command): implement From trait for CommandError to String conversion * feat(command): update execute_protocol_command to return OkHousten and enhance process_binary_data with error handling * refactor(network): handle process write binary data and closing connection on sender close it * feat(network): enhance TransferState to track current file during data transfer
1 parent afd6866 commit 42dbf55

File tree

18 files changed

+392
-74
lines changed

18 files changed

+392
-74
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use crate::core::domain::{command::ports::CommandService, network::ports::NetworkService};
2+
3+
#[derive(Clone, Copy)]
4+
pub struct FerrisShareState<C, N>
5+
where
6+
C: CommandService,
7+
N: NetworkService,
8+
{
9+
pub command_service: C,
10+
pub network_service: N,
11+
}
12+
13+
impl<C, N> FerrisShareState<C, N>
14+
where
15+
C: CommandService + Clone + Send + Sync + 'static,
16+
N: NetworkService + Clone + Send + Sync + 'static,
17+
{
18+
pub fn new(command_service: C, network_service: N) -> Self {
19+
FerrisShareState {
20+
command_service,
21+
network_service,
22+
}
23+
}
24+
}

src/application/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod ferrisshare_state;

src/core/domain/command/entities.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,12 @@ pub enum CommandError {
33
InvalidCommand,
44
ExecutionFailed(String),
55
}
6+
7+
impl From<CommandError> for String {
8+
fn from(err: CommandError) -> Self {
9+
match err {
10+
CommandError::InvalidCommand => "Invalid command".to_string(),
11+
CommandError::ExecutionFailed(msg) => format!("Command execution failed: {}", msg),
12+
}
13+
}
14+
}

src/core/domain/command/ports.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,10 @@ pub trait CommandService: Send + Sync {
1010
&self,
1111
state: Arc<tokio::sync::Mutex<TransferState>>,
1212
msg: &ProtocolMessage,
13-
) -> impl Future<Output = Result<ProtocolMessage, CommandError>> + Send;
13+
) -> impl Future<Output = Result<ProtocolMessage, CommandError>> + Send + Sync;
14+
fn process_binary_data(
15+
&self,
16+
state: Arc<tokio::sync::Mutex<TransferState>>,
17+
data: &[u8],
18+
) -> impl Future<Output = Result<(), CommandError>>;
1419
}

src/core/domain/command/services.rs

Lines changed: 103 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,30 @@ use std::sync::Arc;
33
use crate::core::domain::{
44
command::{entities::CommandError, ports::CommandService},
55
network::entities::{ProtocolMessage, TransferState},
6-
storage::entities::YeetBlock,
6+
storage::ports::StorageRepository,
77
};
88

99
#[derive(Clone)]
10-
pub struct CommandServiceImpl {}
10+
pub struct CommandServiceImpl<C>
11+
where
12+
C: StorageRepository,
13+
{
14+
storage: C,
15+
}
1116

12-
impl CommandServiceImpl {
13-
pub fn new() -> Self {
14-
CommandServiceImpl {}
17+
impl<C> CommandServiceImpl<C>
18+
where
19+
C: StorageRepository + Clone + Send + Sync + 'static,
20+
{
21+
pub fn new(storage: C) -> Self {
22+
CommandServiceImpl { storage }
1523
}
1624
}
1725

18-
impl CommandService for CommandServiceImpl {
26+
impl<C> CommandService for CommandServiceImpl<C>
27+
where
28+
C: StorageRepository + Clone + Send + Sync + 'static,
29+
{
1930
async fn execute_protocol_command(
2031
&self,
2132
state: Arc<tokio::sync::Mutex<TransferState>>,
@@ -34,6 +45,7 @@ impl CommandService for CommandServiceImpl {
3445
expected_blocks
3546
);
3647
*state_guard = TransferState::Receiving {
48+
current_file: _filename.clone(),
3749
expected_blocks,
3850
focused_block: None,
3951
received_blocks: Vec::with_capacity(expected_blocks as usize),
@@ -45,18 +57,25 @@ impl CommandService for CommandServiceImpl {
4557
}
4658
ProtocolMessage::Yeet(yeet_block) => {
4759
let mut state_guard = state.lock().await;
48-
let (expected_blocks, focused_block, received_blocks) = match &mut *state_guard {
49-
TransferState::Receiving {
50-
expected_blocks,
51-
focused_block,
52-
received_blocks,
53-
} => (expected_blocks, focused_block, received_blocks),
54-
_ => {
55-
return Err(CommandError::ExecutionFailed(
56-
"Error transfer state is not equal Receiving".to_string(),
57-
));
58-
}
59-
};
60+
let (expected_blocks, focused_block, received_blocks, current_file) =
61+
match &mut *state_guard {
62+
TransferState::Receiving {
63+
expected_blocks,
64+
focused_block,
65+
received_blocks,
66+
current_file,
67+
} => (
68+
expected_blocks,
69+
focused_block,
70+
received_blocks,
71+
current_file,
72+
),
73+
_ => {
74+
return Err(CommandError::ExecutionFailed(
75+
"Error transfer state is not equal Receiving".to_string(),
76+
));
77+
}
78+
};
6079

6180
// Ensure we don't exceed the expected number of blocks.
6281
if received_blocks.len() >= *expected_blocks as usize {
@@ -77,17 +96,30 @@ impl CommandService for CommandServiceImpl {
7796

7897
// Reuse the mutable guard to update the state without locking again.
7998
*state_guard = TransferState::Receiving {
99+
current_file: current_file.clone(),
80100
expected_blocks: *expected_blocks,
81101
focused_block: Some(yeet_block.clone()),
82102
received_blocks: received_blocks.clone(),
83103
};
84104

85105
drop(state_guard);
86106

87-
Ok(ProtocolMessage::Ok)
107+
Ok(ProtocolMessage::OkHousten(yeet_block.index))
88108
}
89109
ProtocolMessage::MissionAccomplished => {
90110
let mut state_guard = state.lock().await;
111+
let current_file = match &*state_guard {
112+
TransferState::Receiving { current_file, .. } => current_file,
113+
_ => {
114+
return Err(CommandError::ExecutionFailed(
115+
"Error transfer state is not equal Receiving".to_string(),
116+
));
117+
}
118+
};
119+
120+
self.storage.finalize(current_file).await.map_err(|e| {
121+
CommandError::ExecutionFailed(format!("Storage error: {:?}", e))
122+
})?;
91123
*state_guard = TransferState::Finished;
92124
drop(state_guard);
93125
Ok(ProtocolMessage::Success)
@@ -99,4 +131,56 @@ impl CommandService for CommandServiceImpl {
99131
_ => Err(CommandError::InvalidCommand),
100132
}
101133
}
134+
135+
async fn process_binary_data(
136+
&self,
137+
state: Arc<tokio::sync::Mutex<TransferState>>,
138+
data: &[u8],
139+
) -> Result<(), CommandError> {
140+
let mut state_guard = state.lock().await;
141+
let (expected_blocks, focused_block, received_blocks, current_file) =
142+
match &mut *state_guard {
143+
TransferState::Receiving {
144+
expected_blocks,
145+
focused_block,
146+
received_blocks,
147+
current_file,
148+
} => (
149+
expected_blocks,
150+
focused_block,
151+
received_blocks,
152+
current_file,
153+
),
154+
_ => {
155+
return Err(CommandError::ExecutionFailed(
156+
"Error transfer state is not equal Receiving".to_string(),
157+
));
158+
}
159+
};
160+
if let Some(focused_block) = focused_block {
161+
if received_blocks.contains(&focused_block.index) {
162+
println!("Block {} already received, ignoring.", focused_block.index);
163+
} else {
164+
println!("Stored binary data block: {:?}", focused_block);
165+
self.storage
166+
.write_block(current_file, focused_block, data)
167+
.await
168+
.map_err(|e| {
169+
CommandError::ExecutionFailed(format!("Storage error: {:?}", e))
170+
})?;
171+
172+
received_blocks.push(focused_block.index);
173+
*state_guard = TransferState::Receiving {
174+
current_file: current_file.clone(),
175+
expected_blocks: *expected_blocks,
176+
focused_block: None,
177+
received_blocks: received_blocks.clone(),
178+
};
179+
}
180+
} else {
181+
println!("No focused block to store data for.");
182+
}
183+
184+
Ok(())
185+
}
102186
}

src/core/domain/network/entities.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashSet, convert::TryFrom};
1+
use std::convert::TryFrom;
22

33
use crate::core::domain::storage::entities::YeetBlock;
44

@@ -169,6 +169,7 @@ impl From<NetworkError> for String {
169169
pub enum TransferState {
170170
Idle,
171171
Receiving {
172+
current_file: String,
172173
expected_blocks: u64,
173174
focused_block: Option<YeetBlock>,
174175
received_blocks: Vec<u64>,

src/core/domain/network/ports.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pub trait NetworkService {
1212
addr: &str,
1313
tx: Sender<TcpStream>,
1414
) -> impl Future<Output = Result<(), NetworkError>> + Send;
15-
fn handler(&self, rx: Receiver<TcpStream>) -> impl Future<Output = Result<(), Error>> + Send;
15+
fn handler(&self, rx: Receiver<TcpStream>) -> impl Future<Output = Result<(), Error>>;
1616
fn trust_protocol(
1717
&self,
1818
stream: &mut TcpStream,

src/core/domain/network/services.rs

Lines changed: 23 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub struct NetworkServiceImpl<C>
2222
where
2323
C: CommandService,
2424
{
25-
command_service: C,
25+
pub command_service: C,
2626
active: Arc<AtomicBool>,
2727
transfer_state: Arc<Mutex<TransferState>>,
2828
}
@@ -133,59 +133,38 @@ where
133133
.send_message(stream_ref, ProtocolMessage::Error(String::from(e)))
134134
.await;
135135
}
136-
}
137-
Err(e) => {
138-
let mut state_guard = self.transfer_state.lock().await;
139-
let was_receiving = match &mut *state_guard {
140-
TransferState::Receiving {
141-
expected_blocks,
142-
focused_block,
143-
received_blocks,
144-
} => {
145-
println!(
146-
"In Receiving Binary from YeetBlock: expected_blocks={}, focused_block={:?}, received_blocks={:?}, binary data={}",
147-
expected_blocks, focused_block, received_blocks, line
148-
);
149-
150-
// TODO: Implement storage of binary data block here
151136

152-
if let Some(focused_block) = focused_block {
153-
if received_blocks.contains(&focused_block.index) {
154-
println!(
155-
"Block {} already received, ignoring.",
156-
focused_block.index
157-
);
158-
} else {
159-
println!("Stored binary data block: {:?}", focused_block);
160-
received_blocks.push(focused_block.index);
161-
*state_guard = TransferState::Receiving {
162-
expected_blocks: *expected_blocks,
163-
focused_block: None,
164-
received_blocks: received_blocks.clone(),
165-
};
166-
}
167-
} else {
168-
println!("No focused block to store data for.");
169-
}
137+
let guard = self.transfer_state.lock().await;
138+
match *guard {
139+
TransferState::Closed => {
140+
println!("Closing connection.");
141+
self.active
142+
.store(false, std::sync::atomic::Ordering::SeqCst);
143+
drop(guard);
170144

171-
drop(state_guard);
145+
self.reset_transfer_state().await;
146+
let _ = stream_ref.shutdown().await;
172147

173-
true
148+
break;
174149
}
175-
_ => false,
176-
};
177-
178-
if !was_receiving {
179-
eprintln!("Failed to parse message: {:?}", e);
150+
_ => {
151+
continue;
152+
}
153+
}
154+
}
155+
Err(_) => {
156+
if let Err(e) = self
157+
.command_service
158+
.process_binary_data(Arc::clone(&self.transfer_state), &buf)
159+
.await
160+
{
161+
eprintln!("Error processing binary data: {:?}", e);
180162
let _ = self
181163
.send_message(stream_ref, ProtocolMessage::Error(String::from(e)))
182164
.await;
183165
}
184166
}
185167
}
186-
187-
println!("Received line: {}", line);
188-
// handle msg here...
189168
}
190169
}
191170

src/core/domain/storage/entities.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,32 @@ pub struct File {
2020
pub name: String,
2121
pub size: u64,
2222
}
23+
24+
#[derive(Debug)]
25+
pub enum StorageError {
26+
FileNotFound,
27+
PermissionDenied,
28+
AlreadyExists,
29+
AbsolutePathNotAllowed,
30+
ParentDirSegmentNotAllowed,
31+
InvalidFilename,
32+
Unknown(String),
33+
}
34+
35+
impl From<StorageError> for String {
36+
fn from(error: StorageError) -> Self {
37+
match error {
38+
StorageError::FileNotFound => "File not found".into(),
39+
StorageError::PermissionDenied => "Permission denied".into(),
40+
StorageError::AlreadyExists => "File already exists".into(),
41+
StorageError::AbsolutePathNotAllowed => {
42+
"Absolute paths are not allowed in filenames".into()
43+
}
44+
StorageError::ParentDirSegmentNotAllowed => {
45+
"Parent directory segments are not allowed in filenames".into()
46+
}
47+
StorageError::InvalidFilename => "Invalid filename".into(),
48+
StorageError::Unknown(msg) => format!("Unknown storage error: {}", msg),
49+
}
50+
}
51+
}

src/core/domain/storage/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
pub mod entities;
2+
pub mod ports;

0 commit comments

Comments
 (0)