Skip to content

Commit c34a921

Browse files
committed
Add a ChunkConsumer trait.
1 parent ca8b65f commit c34a921

File tree

4 files changed

+65
-29
lines changed

4 files changed

+65
-29
lines changed

wholesym/src/breakpad.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::sync::Arc;
44
use samply_symbols::{BreakpadIndexCreator, BreakpadParseError};
55
use tokio::io::{AsyncReadExt, AsyncWriteExt};
66

7-
use crate::downloader::{Downloader, DownloaderObserver, FileDownloadOutcome};
7+
use crate::downloader::{ChunkConsumer, Downloader, DownloaderObserver, FileDownloadOutcome};
88
use crate::file_creation::{create_file_cleanly, CleanFileCreationError};
99
use crate::DownloadError;
1010

@@ -160,15 +160,14 @@ impl BreakpadSymbolDownloaderInner {
160160

161161
let observer = self.observer.clone();
162162
let download = self.downloader.initiate_download(&url, observer).await?;
163-
let mut index_generator = BreakpadIndexCreator::new();
164-
let mut consumer = |chunk: &[u8]| index_generator.consume(chunk);
163+
let index_generator = BreakpadIndexCreatorChunkConsumer(BreakpadIndexCreator::new());
165164
let outcome = download
166-
.download_to_file(&dest_path, Some(&mut consumer))
165+
.download_to_file_with_chunk_consumer(&dest_path, index_generator)
167166
.await?;
168167

169168
match outcome {
170-
FileDownloadOutcome::DidCreateNewFile => {
171-
if let Ok(index) = index_generator.finish() {
169+
FileDownloadOutcome::DidCreateNewFile(index_result) => {
170+
if let Ok(index) = index_result {
172171
if let Some(symindex_path) = self.symindex_path(rel_path) {
173172
let _ = self.write_symindex(&symindex_path, index).await;
174173
}
@@ -289,3 +288,17 @@ impl BreakpadSymbolDownloaderInner {
289288
.map_err(SymindexGenerationError::BreakpadParsing)
290289
}
291290
}
291+
292+
struct BreakpadIndexCreatorChunkConsumer(BreakpadIndexCreator);
293+
294+
impl ChunkConsumer for BreakpadIndexCreatorChunkConsumer {
295+
type Output = Result<Vec<u8>, BreakpadParseError>;
296+
297+
fn consume_chunk(&mut self, chunk_data: &[u8]) {
298+
self.0.consume(chunk_data);
299+
}
300+
301+
fn finish(self) -> Self::Output {
302+
self.0.finish()
303+
}
304+
}

wholesym/src/debuginfod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl ManualDebuginfodDownloader {
163163
.initiate_download(&url, self.observer.clone())
164164
.await
165165
.ok()?;
166-
download.download_to_file(&dest_path, None).await.ok()?;
166+
download.download_to_file(&dest_path).await.ok()?;
167167

168168
Some(dest_path)
169169
}

wholesym/src/downloader.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,20 @@ impl Drop for DownloadStatusReporter {
161161
}
162162
}
163163

164+
pub trait ChunkConsumer {
165+
type Output;
166+
fn consume_chunk(&mut self, chunk_data: &[u8]);
167+
fn finish(self) -> Self::Output;
168+
}
169+
170+
struct NoopChunkConsumer;
171+
172+
impl ChunkConsumer for NoopChunkConsumer {
173+
type Output = ();
174+
fn consume_chunk(&mut self, _chunk_data: &[u8]) {}
175+
fn finish(self) -> Self::Output {}
176+
}
177+
164178
pub struct Downloader {
165179
reqwest_client: Result<reqwest::Client, reqwest::Error>,
166180
}
@@ -283,18 +297,29 @@ pub struct PendingDownload {
283297
ts_after_status: Instant,
284298
}
285299

286-
pub enum FileDownloadOutcome {
287-
DidCreateNewFile,
300+
pub enum FileDownloadOutcome<T> {
301+
DidCreateNewFile(T),
288302
FoundExistingFile,
289303
}
290304

291305
impl PendingDownload {
292-
#[allow(clippy::type_complexity)]
293306
pub async fn download_to_file(
294307
self,
295308
dest_path: &Path,
296-
chunk_consumer: Option<&mut (dyn FnMut(&[u8]) + Send)>,
297-
) -> Result<FileDownloadOutcome, DownloadError> {
309+
) -> Result<FileDownloadOutcome<()>, DownloadError> {
310+
self.download_to_file_with_chunk_consumer(dest_path, NoopChunkConsumer)
311+
.await
312+
}
313+
314+
pub async fn download_to_file_with_chunk_consumer<C, O>(
315+
self,
316+
dest_path: &Path,
317+
chunk_consumer: C,
318+
) -> Result<FileDownloadOutcome<O>, DownloadError>
319+
where
320+
C: ChunkConsumer<Output = O> + Send + 'static,
321+
O: Send + 'static,
322+
{
298323
let PendingDownload {
299324
reporter,
300325
stream,
@@ -317,7 +342,7 @@ impl PendingDownload {
317342
}
318343

319344
let download_result: Result<
320-
(FileDownloadOutcome, u64),
345+
(FileDownloadOutcome<C::Output>, u64),
321346
CleanFileCreationError<DownloadError>,
322347
> = create_file_cleanly(
323348
dest_path,
@@ -371,10 +396,7 @@ impl PendingDownload {
371396

372397
#[allow(clippy::type_complexity)]
373398
#[allow(dead_code)]
374-
pub async fn download_to_memory(
375-
self,
376-
mut chunk_consumer: Option<&mut (dyn FnMut(&[u8]) + Send)>,
377-
) -> Result<Vec<u8>, DownloadError> {
399+
pub async fn download_to_memory(self) -> Result<Vec<u8>, DownloadError> {
378400
let PendingDownload {
379401
reporter,
380402
mut stream,
@@ -399,9 +421,6 @@ impl PendingDownload {
399421
}
400422
uncompressed_size_in_bytes += count as u64;
401423
bytes_ref.extend_from_slice(&buf[..count]);
402-
if let Some(chunk_consumer) = &mut chunk_consumer {
403-
chunk_consumer(&buf[..count]);
404-
}
405424
}
406425
Ok(uncompressed_size_in_bytes)
407426
}
@@ -428,12 +447,15 @@ impl PendingDownload {
428447
}
429448
}
430449

431-
#[allow(clippy::type_complexity)]
432-
async fn consume_stream_and_write_to_file(
450+
async fn consume_stream_and_write_to_file<C, O>(
433451
mut stream: Pin<Box<dyn AsyncRead + Send + Sync>>,
434-
mut chunk_consumer: Option<&mut (dyn FnMut(&[u8]) + Send)>,
452+
mut chunk_consumer: C,
435453
dest_file: std::fs::File,
436-
) -> Result<(FileDownloadOutcome, u64), DownloadError> {
454+
) -> Result<(FileDownloadOutcome<O>, u64), DownloadError>
455+
where
456+
C: ChunkConsumer<Output = O> + Send + 'static,
457+
O: Send + 'static,
458+
{
437459
let mut dest_file = tokio::fs::File::from_std(dest_file);
438460
let mut buf = vec![0u8; 2 * 1024 * 1024 /* 2 MiB */];
439461
let mut uncompressed_size_in_bytes = 0;
@@ -446,17 +468,18 @@ async fn consume_stream_and_write_to_file(
446468
break;
447469
}
448470
uncompressed_size_in_bytes += count as u64;
471+
chunk_consumer.consume_chunk(&buf[..count]);
449472
dest_file
450473
.write_all(&buf[..count])
451474
.await
452475
.map_err(DownloadError::DiskWrite)?;
453-
if let Some(chunk_consumer) = &mut chunk_consumer {
454-
chunk_consumer(&buf[..count]);
455-
}
456476
}
477+
478+
let chunk_consumer_output = chunk_consumer.finish();
457479
dest_file.flush().await.map_err(DownloadError::DiskWrite)?;
480+
458481
Ok((
459-
FileDownloadOutcome::DidCreateNewFile,
482+
FileDownloadOutcome::DidCreateNewFile(chunk_consumer_output),
460483
uncompressed_size_in_bytes,
461484
))
462485
}

wholesym/src/helper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ impl Helper {
384384
.downloader
385385
.initiate_download(&url, Some(self.observer.clone()))
386386
.await?;
387-
let bytes = download.download_to_memory(None).await?;
387+
let bytes = download.download_to_memory().await?;
388388
return Ok(WholesymFileContents::Bytes(bytes.into()));
389389
}
390390
WholesymFileLocation::SymsrvFile(filename, hash) => {

0 commit comments

Comments
 (0)