Skip to content

Commit f776489

Browse files
committed
Implement progress bar
1 parent 6a4fd80 commit f776489

File tree

16 files changed

+646
-632
lines changed

16 files changed

+646
-632
lines changed

p2p-cli/src/cli.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,7 @@ pub struct TransferParams {
8080
#[arg(long, value_parser = parse_bandwidth_arg, default_value = "0")]
8181
pub max_speed: u64,
8282

83-
/// Enable automatic reconnection on network failures (default: enabled, use --auto-reconnect=false to disable)
84-
#[arg(long, default_value = "true", action = clap::ArgAction::Set)]
85-
pub auto_reconnect: bool,
86-
87-
/// Maximum reconnection attempts (0 = unlimited)
83+
/// Maximum reconnection attempts on network failures (0 = unlimited, 1 = no retry)
8884
#[arg(long, default_value = "5")]
8985
pub max_retries: u32,
9086
}

p2p-cli/src/discover.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub async fn handle_discover(timeout_secs: u64, port: u16) -> Result<()> {
3232
// Get discovered peers
3333
let peers = manager.get_peers().await;
3434

35-
info!("\n📡 Discovered {} peer(s):", peers.len());
35+
info!("📡 Discovered {} peer(s):", peers.len());
3636
for (idx, peer) in peers.iter().enumerate() {
3737
info!(
3838
" [{}] {} - {} ({})",

p2p-cli/src/receive.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,18 @@ pub async fn handle_receive(
4646
)
4747
.await?;
4848

49-
info!(" Session established");
49+
info!(" Session established");
5050
info!(" Peer: {}", session.peer_device_id());
5151
info!(" Compression: {}", session.config().compression_enabled);
5252

53-
info!("\n📁 Session ready - waiting for incoming transfers...");
53+
info!("📁 Session ready - waiting for incoming transfers...");
5454
info!(" (Press Ctrl+C to exit)");
5555

56-
// Run event loop - automatically receives incoming transfers
56+
// Run event loop - automatically receives incoming transfers with progress display
5757
// The loop continues until the peer closes the connection
58-
session.run_event_loop(&output, auto_accept).await?;
58+
session.run_event_loop(&output, auto_accept, true).await?;
5959

60-
info!("\n✅ Session ended");
60+
info!("✅ Session ended");
6161

6262
Ok(())
6363
}

p2p-cli/src/resume.rs

Lines changed: 18 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
//! Resume operations
22
33
use anyhow::Result;
4-
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
54
use p2p_core::{
65
handshake::HandshakeClient,
76
network::tcp::TcpConnection,
87
protocol::{Capabilities, ConfigMessage},
9-
transfer_folder::{FolderProgress, FolderTransferSession, FolderTransferState},
8+
transfer_folder::{FolderTransferSession, FolderTransferState},
109
Uuid,
1110
};
1211
use std::{net::SocketAddr, path::PathBuf};
@@ -87,51 +86,32 @@ pub async fn handle_resume(transfer_id: String, to: String, path: PathBuf) -> Re
8786
});
8887
}));
8988

90-
// Set up progress callback
91-
let multi = MultiProgress::new();
89+
// Create progress state for unified progress tracking
90+
// Initialize with already completed bytes for resume
91+
let mut progress = p2p_core::progress::ProgressState::new(state.total_bytes);
92+
// Add the bytes already transferred
93+
progress.add_bytes(state.transferred_bytes);
9294

93-
let overall_pb = multi.add(ProgressBar::new(100));
94-
overall_pb.set_style(
95-
ProgressStyle::default_bar()
96-
.template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} files ({percent}%)")
97-
.unwrap()
98-
.progress_chars("=>-"),
99-
);
100-
101-
let current_pb = multi.add(ProgressBar::new(100));
102-
current_pb.set_style(
103-
ProgressStyle::default_bar()
104-
.template(" Current: {msg} {bar:40.green/yellow} {bytes}/{total_bytes} ({percent}%)")
105-
.unwrap()
106-
.progress_chars("=>-"),
107-
);
108-
109-
session.set_progress_callback(Box::new(move |progress: FolderProgress| {
110-
overall_pb.set_length(progress.total_files as u64);
111-
overall_pb.set_position(progress.completed_files as u64);
112-
113-
if let Some(file) = &progress.current_file {
114-
current_pb.set_message(file.clone());
115-
current_pb.set_position((progress.current_file_progress * 100.0) as u64);
116-
}
95+
// Resume transfer with signal handling
96+
info!("📁 Resuming folder transfer...");
11797

118-
if progress.completed_files == progress.total_files {
119-
overall_pb.finish_with_message("Complete!");
120-
current_pb.finish_and_clear();
121-
}
122-
}));
98+
// Use the unified send_folder method with existing state
99+
let reconnect_config = p2p_core::reconnect::ReconnectConfig {
100+
max_attempts: 1, // Single attempt, no auto-reconnect for manual resume command
101+
initial_backoff_secs: 3,
102+
max_backoff_secs: 180,
103+
exponential: true,
104+
};
123105

124-
// Resume transfer with signal handling
125-
info!("\n📁 Resuming folder transfer...");
126106
tokio::select! {
127-
result = session.resume_send_folder(&path, &state) => {
107+
result = session.send_folder(&path, &reconnect_config, Some(&state_path), None, Some(&state), Some(&mut progress)) => {
128108
result?;
129109
let _ = tokio::fs::remove_file(&state_path).await;
130-
info!("\n✅ Transfer resumed and completed!");
110+
info!("✅ Transfer resumed and completed!");
131111
info!(" State file removed");
132112
}
133113
_ = signal::ctrl_c() => {
134-
warn!("\n⚠️ Transfer interrupted again. State has been saved.");
114+
warn!("⚠️ Transfer interrupted again. State has been saved.");
135115
info!(" Use 'p2p-transfer resume {} --to {} --path {}' to continue",
136116
transfer_id, to, path.display());
137117
return Ok(());

p2p-cli/src/send.rs

Lines changed: 49 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
//! Send operations
22
33
use anyhow::Result;
4-
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
54
use p2p_core::{
65
protocol::{Capabilities, ConfigMessage},
76
session::P2PSession,
8-
transfer_folder::{FolderProgress, FolderTransferState},
7+
transfer_folder::FolderTransferState,
98
Uuid,
109
};
1110
use std::path::{Path, PathBuf};
@@ -73,13 +72,13 @@ pub async fn handle_send(
7372
)
7473
.await?;
7574

76-
info!(" Session established");
75+
info!(" Session established");
7776
info!(" Peer: {}", session.peer_device_id());
7877
info!(" Capabilities: {:?}", session.capabilities());
7978

8079
// Send file or folder with signal handling (unified)
8180
let result = tokio::select! {
82-
result = send(&mut session, &path, config, transfer_params.auto_reconnect, transfer_params.max_retries) => {
81+
result = send(&mut session, &path, config, transfer_params.max_retries) => {
8382
result
8483
}
8584
_ = signal::ctrl_c() => {
@@ -89,11 +88,11 @@ pub async fn handle_send(
8988

9089
match result {
9190
Ok(_) => {
92-
info!("\n✅ Transfer complete!");
91+
info!("✅ Transfer complete!");
9392
Ok(())
9493
}
9594
Err(e) => {
96-
warn!("\n⚠️ Transfer interrupted: {}", e);
95+
warn!("⚠️ Transfer interrupted: {}", e);
9796
info!(" State has been saved. Use 'p2p-transfer resume <transfer-id>' to continue");
9897
Err(e)
9998
}
@@ -104,15 +103,14 @@ async fn send(
104103
session: &mut P2PSession,
105104
path: &Path,
106105
_config: ConfigMessage,
107-
auto_reconnect: bool,
108106
max_retries: u32,
109107
) -> Result<()> {
110108
let base_name = path.file_name().unwrap().to_string_lossy().to_string();
111109

112110
if path.is_file() {
113-
info!("\n📄 Sending file: {}", base_name);
111+
info!("📄 Sending file: {}", base_name);
114112
} else {
115-
info!("\n📁 Sending folder: {}", base_name);
113+
info!("📁 Sending folder: {}", base_name);
116114
}
117115

118116
let config = session.config();
@@ -125,15 +123,13 @@ async fn send(
125123
);
126124
}
127125

128-
if auto_reconnect {
129-
info!(
130-
" Auto-reconnect enabled (max retries: {})",
131-
if max_retries == 0 {
132-
"∞".to_string()
133-
} else {
134-
max_retries.to_string()
135-
}
136-
);
126+
// Display reconnection behavior based on max_retries
127+
if max_retries == 0 {
128+
info!(" Auto-reconnect: enabled (unlimited retries)");
129+
} else if max_retries == 1 {
130+
info!(" Auto-reconnect: disabled (no retry)");
131+
} else {
132+
info!(" Auto-reconnect: enabled (max {} retries)", max_retries);
137133
}
138134

139135
// Generate transfer ID for this operation
@@ -144,86 +140,45 @@ async fn send(
144140

145141
// Keep state in memory using Arc<Mutex> for thread-safe access
146142
let current_state = std::sync::Arc::new(std::sync::Mutex::new(None::<FolderTransferState>));
147-
148-
// Create multi-progress for overall and per-file progress
149-
let multi = MultiProgress::new();
150-
151-
let overall_pb = multi.add(ProgressBar::new(100));
152-
overall_pb.set_style(
153-
ProgressStyle::default_bar()
154-
.template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} files ({percent}%)")
155-
.unwrap()
156-
.progress_chars("=>-"),
157-
);
158-
159-
let current_pb = multi.add(ProgressBar::new(100));
160-
current_pb.set_style(
161-
ProgressStyle::default_bar()
162-
.template(" Current: {msg} {bar:40.green/yellow} {bytes}/{total_bytes} ({percent}%)")
163-
.unwrap()
164-
.progress_chars("=>-"),
165-
);
166-
167-
// Create progress callback
168-
let progress_callback = Box::new(move |progress: FolderProgress| {
169-
// Update overall progress
170-
overall_pb.set_length(progress.total_files as u64);
171-
overall_pb.set_position(progress.completed_files as u64);
172-
173-
// Update current file progress
174-
if let Some(file) = &progress.current_file {
175-
current_pb.set_message(file.clone());
176-
current_pb.set_position((progress.current_file_progress * 100.0) as u64);
177-
}
178-
179-
// If all files complete, finish both bars
180-
if progress.completed_files == progress.total_files {
181-
overall_pb.finish_with_message("Complete!");
182-
current_pb.finish_and_clear();
143+
// Create progress state for unified progress tracking
144+
let mut progress = p2p_core::progress::ProgressState::new(0);
145+
146+
// Create state callback to update in-memory state
147+
let current_state_for_callback = current_state.clone();
148+
let state_callback = Box::new(move |state: &FolderTransferState| {
149+
if let Ok(mut guard) = current_state_for_callback.lock() {
150+
*guard = Some(state.clone());
183151
}
184152
});
185153

186-
// Send file or folder with state tracking (unified method)
187-
let result = if auto_reconnect {
188-
// Create state callback to update in-memory state
189-
let current_state_for_callback = current_state.clone();
190-
let state_callback = Box::new(move |state: &FolderTransferState| {
191-
if let Ok(mut guard) = current_state_for_callback.lock() {
192-
*guard = Some(state.clone());
193-
}
194-
});
195-
196-
// Use auto-reconnect wrapper with in-memory state
197-
let reconnect_config = p2p_core::reconnect::ReconnectConfig {
198-
max_attempts: max_retries,
199-
initial_backoff_secs: 3,
200-
max_backoff_secs: 180,
201-
exponential: true,
202-
};
154+
// Configure reconnection behavior
155+
let reconnect_config = p2p_core::reconnect::ReconnectConfig {
156+
max_attempts: max_retries,
157+
initial_backoff_secs: 3,
158+
max_backoff_secs: 180,
159+
exponential: true,
160+
};
203161

204-
// Create state provider closure that reads from in-memory state
205-
let state_for_reconnect = current_state.clone();
206-
let state_provider = Box::new(move || {
207-
state_for_reconnect
208-
.lock()
209-
.ok()
210-
.and_then(|guard| guard.clone())
211-
});
162+
// Create state provider closure that reads from in-memory state
163+
let state_for_reconnect = current_state.clone();
164+
let state_provider = Box::new(move || {
165+
state_for_reconnect
166+
.lock()
167+
.ok()
168+
.and_then(|guard| guard.clone())
169+
});
212170

213-
session
214-
.send_path_with_reconnect(
215-
path,
216-
Some(progress_callback),
217-
Some(state_callback),
218-
&reconnect_config,
219-
Some(&state_file),
220-
Some(state_provider),
221-
)
222-
.await
223-
} else {
224-
// Regular send without auto-reconnect - state stays in memory only
225-
session.send_path(path, Some(progress_callback)).await
226-
};
171+
// Send file or folder with state tracking (unified method)
172+
let result = session
173+
.send_path(
174+
path,
175+
Some(&mut progress),
176+
Some(state_callback),
177+
&reconnect_config,
178+
Some(&state_file),
179+
Some(state_provider),
180+
)
181+
.await;
227182

228183
match result {
229184
Ok(_) => {
@@ -243,7 +198,7 @@ async fn send(
243198
let _ = tokio::fs::remove_file(&state_file).await;
244199
}
245200
}
246-
info!("Transfer complete");
201+
info!("Transfer complete!");
247202
Ok(())
248203
}
249204
Err(e) => {

p2p-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ serde_json = "1.0.145"
2525
rand = "0.8"
2626
dirs = "5.0"
2727
local-ip-address = "0.6"
28+
indicatif = "0.17"
2829

2930
[dev-dependencies]
3031
tokio-test = "0.4"

p2p-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod handshake;
1212
pub mod history; // Transfer history tracking
1313
pub mod nat; // NAT traversal and hole punching
1414
pub mod network;
15+
pub mod progress; // Unified progress tracking
1516
pub mod protocol;
1617
pub mod reconnect; // Auto-reconnect with exponential backoff
1718
pub mod session; // High-level session management

0 commit comments

Comments
 (0)