Skip to content

Commit ca8b65f

Browse files
committed
Factor out consume_stream_and_write_to_file.
1 parent e418fd7 commit ca8b65f

File tree

1 file changed

+36
-27
lines changed

1 file changed

+36
-27
lines changed

wholesym/src/downloader.rs

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -293,11 +293,11 @@ impl PendingDownload {
293293
pub async fn download_to_file(
294294
self,
295295
dest_path: &Path,
296-
mut chunk_consumer: Option<&mut (dyn FnMut(&[u8]) + Send)>,
296+
chunk_consumer: Option<&mut (dyn FnMut(&[u8]) + Send)>,
297297
) -> Result<FileDownloadOutcome, DownloadError> {
298298
let PendingDownload {
299299
reporter,
300-
mut stream,
300+
stream,
301301
observer,
302302
ts_after_status,
303303
} = self;
@@ -322,31 +322,7 @@ impl PendingDownload {
322322
> = create_file_cleanly(
323323
dest_path,
324324
|dest_file: std::fs::File| async move {
325-
let mut dest_file = tokio::fs::File::from_std(dest_file);
326-
let mut buf = vec![0u8; 2 * 1024 * 1024 /* 2 MiB */];
327-
let mut uncompressed_size_in_bytes = 0;
328-
loop {
329-
let count = stream
330-
.read(&mut buf)
331-
.await
332-
.map_err(DownloadError::StreamRead)?;
333-
if count == 0 {
334-
break;
335-
}
336-
uncompressed_size_in_bytes += count as u64;
337-
dest_file
338-
.write_all(&buf[..count])
339-
.await
340-
.map_err(DownloadError::DiskWrite)?;
341-
if let Some(chunk_consumer) = &mut chunk_consumer {
342-
chunk_consumer(&buf[..count]);
343-
}
344-
}
345-
dest_file.flush().await.map_err(DownloadError::DiskWrite)?;
346-
Ok((
347-
FileDownloadOutcome::DidCreateNewFile,
348-
uncompressed_size_in_bytes,
349-
))
325+
consume_stream_and_write_to_file(stream, chunk_consumer, dest_file).await
350326
},
351327
|| async {
352328
let size = std::fs::metadata(dest_path)
@@ -451,3 +427,36 @@ impl PendingDownload {
451427
Ok(bytes)
452428
}
453429
}
430+
431+
#[allow(clippy::type_complexity)]
432+
async fn consume_stream_and_write_to_file(
433+
mut stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
434+
mut chunk_consumer: Option<&mut (dyn FnMut(&[u8]) + Send)>,
435+
dest_file: std::fs::File,
436+
) -> Result<(FileDownloadOutcome, u64), DownloadError> {
437+
let mut dest_file = tokio::fs::File::from_std(dest_file);
438+
let mut buf = vec![0u8; 2 * 1024 * 1024 /* 2 MiB */];
439+
let mut uncompressed_size_in_bytes = 0;
440+
loop {
441+
let count = stream
442+
.read(&mut buf)
443+
.await
444+
.map_err(DownloadError::StreamRead)?;
445+
if count == 0 {
446+
break;
447+
}
448+
uncompressed_size_in_bytes += count as u64;
449+
dest_file
450+
.write_all(&buf[..count])
451+
.await
452+
.map_err(DownloadError::DiskWrite)?;
453+
if let Some(chunk_consumer) = &mut chunk_consumer {
454+
chunk_consumer(&buf[..count]);
455+
}
456+
}
457+
dest_file.flush().await.map_err(DownloadError::DiskWrite)?;
458+
Ok((
459+
FileDownloadOutcome::DidCreateNewFile,
460+
uncompressed_size_in_bytes,
461+
))
462+
}

0 commit comments

Comments
 (0)