Skip to content

Commit 587816c

Browse files
committed
code cleanups
1 parent f77789e commit 587816c

File tree

4 files changed

+27
-59
lines changed

4 files changed

+27
-59
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/fls/block_writer.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub(crate) struct BlockWriter {
7878
buffer_pos: usize, // Current position in buffer
7979
bytes_written: u64,
8080
bytes_since_sync: u64, // Track bytes written since last sync
81-
written_tx: mpsc::UnboundedSender<u64>,
81+
written_progress_tx: mpsc::UnboundedSender<u64>,
8282
use_direct_io: bool, // Track if O_DIRECT is active
8383
#[allow(dead_code)]
8484
debug: bool, // Debug mode flag
@@ -91,7 +91,7 @@ impl BlockWriter {
9191
/// Open a block device for writing with direct I/O
9292
pub(crate) fn new(
9393
device: &str,
94-
written_tx: mpsc::UnboundedSender<u64>,
94+
written_progress_tx: mpsc::UnboundedSender<u64>,
9595
debug: bool,
9696
o_direct: bool,
9797
) -> io::Result<Self> {
@@ -222,7 +222,7 @@ impl BlockWriter {
222222
buffer_pos: 0,
223223
bytes_written: 0,
224224
bytes_since_sync: 0,
225-
written_tx,
225+
written_progress_tx,
226226
use_direct_io,
227227
debug,
228228
})
@@ -254,7 +254,7 @@ impl BlockWriter {
254254

255255
// Send progress update periodically (every 256KB to reduce overhead)
256256
if self.bytes_written.is_multiple_of(256 * 1024) {
257-
let _ = self.written_tx.send(self.bytes_written);
257+
let _ = self.written_progress_tx.send(self.bytes_written);
258258
}
259259
}
260260

@@ -304,7 +304,7 @@ impl BlockWriter {
304304
self.file.sync_all()?;
305305
self.bytes_since_sync = 0;
306306
// Send final progress update
307-
let _ = self.written_tx.send(self.bytes_written);
307+
let _ = self.written_progress_tx.send(self.bytes_written);
308308
Ok(())
309309
}
310310

@@ -324,7 +324,7 @@ impl AsyncBlockWriter {
324324
/// Create a new async block writer
325325
pub(crate) fn new(
326326
device: String,
327-
written_tx: mpsc::UnboundedSender<u64>,
327+
written_progress_tx: mpsc::UnboundedSender<u64>,
328328
debug: bool,
329329
o_direct: bool,
330330
) -> io::Result<Self> {
@@ -333,7 +333,7 @@ impl AsyncBlockWriter {
333333
// Spawn blocking task for I/O operations
334334
let writer_handle = tokio::task::spawn_blocking(move || {
335335
let mut writer =
336-
BlockWriter::new(&device, written_tx, debug, o_direct).map_err(|e| {
336+
BlockWriter::new(&device, written_progress_tx, debug, o_direct).map_err(|e| {
337337
eprintln!("Failed to open device '{}': {}", device, e);
338338
e
339339
})?;

src/fls/decompress.rs

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
1+
use tokio::io::AsyncReadExt;
22
use tokio::process::{Child, Command};
33
use tokio::sync::mpsc;
44

@@ -49,38 +49,6 @@ pub(crate) async fn start_decompressor_process(
4949
Ok((process, cmd))
5050
}
5151

52-
#[allow(dead_code)]
53-
pub(crate) async fn spawn_decompressor_stdout_reader(
54-
mut stdout: tokio::process::ChildStdout,
55-
decompressed_tx: mpsc::UnboundedSender<Vec<u8>>,
56-
mut dd_stdin: tokio::process::ChildStdin,
57-
error_tx: mpsc::UnboundedSender<String>,
58-
) {
59-
let mut buffer = [0u8; 8 * 1024 * 1024]; // 8MB buffer for better performance
60-
loop {
61-
match stdout.read(&mut buffer).await {
62-
Ok(0) => break, // EOF
63-
Ok(n) => {
64-
// Send to progress tracking
65-
if decompressed_tx.send(buffer[..n].to_vec()).is_err() {
66-
break;
67-
}
68-
// Write to dd stdin
69-
if let Err(e) = dd_stdin.write_all(&buffer[..n]).await {
70-
let _ = error_tx.send(format!("Error writing to dd stdin: {}", e));
71-
break;
72-
}
73-
}
74-
Err(e) => {
75-
let _ = error_tx.send(format!("Error reading from decompressor stdout: {}", e));
76-
break;
77-
}
78-
}
79-
}
80-
// Close dd stdin when decompressor is done
81-
let _ = dd_stdin.shutdown().await;
82-
}
83-
8452
pub(crate) async fn spawn_stderr_reader(
8553
mut stderr: tokio::process::ChildStderr,
8654
error_tx: mpsc::UnboundedSender<String>,

src/fls/from_url.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@ pub async fn flash_from_url(
2424
let decompressor_stderr = decompressor.stderr.take().unwrap();
2525

2626
// Create channels
27-
let (decompressed_tx, mut decompressed_rx) = mpsc::unbounded_channel::<u64>();
27+
let (decompressed_progress_tx, mut decompressed_progress_rx) = mpsc::unbounded_channel::<u64>();
2828
let (error_tx, error_rx) = mpsc::unbounded_channel::<String>();
29-
let (written_tx, mut written_rx) = mpsc::unbounded_channel::<u64>();
29+
let (written_progress_tx, mut written_progress_rx) = mpsc::unbounded_channel::<u64>();
3030

3131
println!("Opening block device for writing: {}", options.device);
3232

3333
// Create block writer
3434
let block_writer = AsyncBlockWriter::new(
3535
options.device.clone(),
36-
written_tx,
36+
written_progress_tx,
3737
options.debug,
3838
options.o_direct,
3939
)?;
@@ -52,7 +52,7 @@ pub async fn flash_from_url(
5252
Ok(n) => {
5353
let data = buffer[..n].to_vec();
5454
// Send byte count to progress tracking
55-
if decompressed_tx.send(n as u64).is_err() {
55+
if decompressed_progress_tx.send(n as u64).is_err() {
5656
break;
5757
}
5858
// Write to block device
@@ -111,7 +111,7 @@ pub async fn flash_from_url(
111111
let (buffer_tx, mut buffer_rx) = mpsc::channel::<bytes::Bytes>(buffer_capacity);
112112

113113
// Channels for tracking bytes actually written to decompressor
114-
let (decompressor_written_tx, mut decompressor_written_rx) = mpsc::unbounded_channel::<u64>();
114+
let (decompressor_written_progress_tx, mut decompressor_written_progress_rx) = mpsc::unbounded_channel::<u64>();
115115

116116
// Spawn persistent task to write buffered chunks to decompressor
117117
let decompressor_writer_handle = tokio::spawn(async move {
@@ -121,7 +121,7 @@ pub async fn flash_from_url(
121121
return Err(format!("Error writing to decompressor stdin: {}", e));
122122
}
123123
// Notify that bytes were written to decompressor
124-
let _ = decompressor_written_tx.send(chunk_len);
124+
let _ = decompressor_written_progress_tx.send(chunk_len);
125125
}
126126
// Close decompressor stdin when channel is closed
127127
Ok::<(), String>(())
@@ -220,7 +220,7 @@ pub async fn flash_from_url(
220220
retry_count = 0; // Reset retry count on successful download
221221

222222
// Track bytes actually written to decompressor
223-
while let Ok(written_len) = decompressor_written_rx.try_recv() {
223+
while let Ok(written_len) = decompressor_written_progress_rx.try_recv() {
224224
bytes_sent_to_decompressor += written_len;
225225
progress.bytes_sent_to_decompressor += written_len;
226226
}
@@ -234,11 +234,11 @@ pub async fn flash_from_url(
234234
}
235235

236236
// Update progress from other channels
237-
while let Ok(byte_count) = decompressed_rx.try_recv() {
237+
while let Ok(byte_count) = decompressed_progress_rx.try_recv() {
238238
progress.bytes_decompressed += byte_count;
239239
}
240240

241-
while let Ok(written_bytes) = written_rx.try_recv() {
241+
while let Ok(written_bytes) = written_progress_rx.try_recv() {
242242
progress.bytes_written = written_bytes;
243243
}
244244

@@ -338,18 +338,18 @@ pub async fn flash_from_url(
338338
// Update progress from all channels
339339
let mut updated = false;
340340

341-
while let Ok(written_len) = decompressor_written_rx.try_recv() {
341+
while let Ok(written_len) = decompressor_written_progress_rx.try_recv() {
342342
bytes_sent_to_decompressor += written_len;
343343
progress.bytes_sent_to_decompressor += written_len;
344344
updated = true;
345345
}
346346

347-
while let Ok(byte_count) = decompressed_rx.try_recv() {
347+
while let Ok(byte_count) = decompressed_progress_rx.try_recv() {
348348
progress.bytes_decompressed += byte_count;
349349
updated = true;
350350
}
351351

352-
while let Ok(written_bytes) = written_rx.try_recv() {
352+
while let Ok(written_bytes) = written_progress_rx.try_recv() {
353353
progress.bytes_written = written_bytes;
354354
updated = true;
355355
}
@@ -381,7 +381,7 @@ pub async fn flash_from_url(
381381
}
382382

383383
// Update any remaining progress
384-
while let Ok(byte_count) = decompressed_rx.try_recv() {
384+
while let Ok(byte_count) = decompressed_progress_rx.try_recv() {
385385
progress.bytes_decompressed += byte_count;
386386
}
387387

@@ -413,12 +413,12 @@ pub async fn flash_from_url(
413413
// Update progress from channels
414414
let mut updated = false;
415415

416-
while let Ok(byte_count) = decompressed_rx.try_recv() {
416+
while let Ok(byte_count) = decompressed_progress_rx.try_recv() {
417417
progress.bytes_decompressed += byte_count;
418418
updated = true;
419419
}
420420

421-
while let Ok(written_bytes) = written_rx.try_recv() {
421+
while let Ok(written_bytes) = written_progress_rx.try_recv() {
422422
progress.bytes_written = written_bytes;
423423
updated = true;
424424
}
@@ -466,7 +466,7 @@ pub async fn flash_from_url(
466466
// Update progress from channels
467467
let mut updated = false;
468468

469-
while let Ok(written_bytes) = written_rx.try_recv() {
469+
while let Ok(written_bytes) = written_progress_rx.try_recv() {
470470
progress.bytes_written = written_bytes;
471471
updated = true;
472472
}
@@ -508,11 +508,11 @@ pub async fn flash_from_url(
508508
}
509509

510510
// Read any remaining progress updates
511-
while let Ok(byte_count) = decompressed_rx.try_recv() {
511+
while let Ok(byte_count) = decompressed_progress_rx.try_recv() {
512512
progress.bytes_decompressed += byte_count;
513513
}
514514

515-
while let Ok(written_bytes) = written_rx.try_recv() {
515+
while let Ok(written_bytes) = written_progress_rx.try_recv() {
516516
progress.bytes_written = written_bytes;
517517
}
518518

0 commit comments

Comments
 (0)