Skip to content

Commit 0670468

Browse files
downloader: Add config for download manager
1 parent 9251524 commit 0670468

File tree

3 files changed

+60
-11
lines changed

3 files changed

+60
-11
lines changed

src/downloader/config.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use std::sync::{
2+
atomic::{AtomicUsize, Ordering},
3+
Arc,
4+
};
5+
6+
#[derive(Debug, Clone)]
7+
pub struct DownloadManagerConfig {
8+
max_concurrent: Arc<AtomicUsize>,
9+
queue_size: usize,
10+
}
11+
12+
impl Default for DownloadManagerConfig {
13+
fn default() -> Self {
14+
Self {
15+
max_concurrent: Arc::new(AtomicUsize::new(3)),
16+
queue_size: 100,
17+
}
18+
}
19+
}
20+
21+
impl DownloadManagerConfig {
22+
pub fn queue_size(&self) -> usize {
23+
self.queue_size
24+
}
25+
26+
pub fn max_concurrent(&self) -> usize {
27+
self.max_concurrent.load(Ordering::Relaxed)
28+
}
29+
30+
pub fn set_max_concurrent(&self, max: usize) {
31+
self.max_concurrent.store(max, Ordering::Relaxed);
32+
}
33+
}

src/downloader/manager.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
1-
use super::{download_thread, DownloadHandle, DownloadRequest};
1+
use super::{config::DownloadManagerConfig, download_thread, DownloadHandle, DownloadRequest};
22
use crate::{error::DownloadError, Error};
33
use reqwest::{Client, Url};
44
use std::{path::Path, sync::Arc};
55
use tokio::sync::{mpsc, Semaphore};
66
use tokio_util::sync::CancellationToken;
77

8-
const QUEUE_SIZE: usize = 100;
9-
108
#[derive(Debug)]
119
pub struct DownloadManager {
1210
queue: mpsc::Sender<DownloadRequest>,
1311
semaphore: Arc<Semaphore>,
1412
cancel: CancellationToken,
13+
config: DownloadManagerConfig,
1514
}
1615

1716
impl Drop for DownloadManager {
@@ -21,15 +20,22 @@ impl Drop for DownloadManager {
2120
}
2221
}
2322

23+
impl Default for DownloadManager {
24+
fn default() -> Self {
25+
Self::with_config(DownloadManagerConfig::default())
26+
}
27+
}
28+
2429
impl DownloadManager {
25-
pub fn new(limit: usize) -> Self {
26-
let (tx, rx) = mpsc::channel(QUEUE_SIZE);
30+
pub fn with_config(config: DownloadManagerConfig) -> Self {
31+
let (tx, rx) = mpsc::channel(config.queue_size());
2732
let client = Client::new();
28-
let semaphore = Arc::new(Semaphore::new(limit));
33+
let semaphore = Arc::new(Semaphore::new(config.max_concurrent()));
2934
let manager = Self {
3035
queue: tx,
3136
semaphore: semaphore.clone(),
3237
cancel: CancellationToken::new(),
38+
config,
3339
};
3440
// Spawn the dispatcher thread to handle download requests
3541
tokio::spawn(async move { dispatcher_thread(client, rx, semaphore).await });
@@ -63,16 +69,24 @@ impl DownloadManager {
6369
Ok(handle)
6470
}
6571

66-
pub fn set_max_parallel_downloads(&self, limit: usize) {
67-
let current = self.semaphore.available_permits();
72+
pub async fn set_max_parallel_downloads(&self, limit: usize) -> Result<(), Error> {
73+
let current = self.config.max_concurrent();
6874
if limit > current {
6975
self.semaphore.add_permits(limit - current);
7076
} else if limit < current {
7177
let to_remove = current - limit;
72-
for _ in 0..to_remove {
73-
let _ = self.semaphore.try_acquire();
74-
}
78+
79+
let permits = self
80+
.semaphore
81+
.acquire_many(to_remove as u32)
82+
.await
83+
.map_err(|_| Error::Download(DownloadError::ManagerShutdown))?;
84+
85+
permits.forget();
7586
}
87+
self.config.set_max_concurrent(limit);
88+
89+
Ok(())
7690
}
7791

7892
pub fn cancel_all(&self) {

src/downloader/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
mod config;
12
mod handle;
23
mod manager;
34
mod progress;
45
mod types;
56
mod worker;
67

8+
pub use config::DownloadManagerConfig;
79
pub use handle::DownloadHandle;
810
pub use manager::*;
911
pub use progress::DownloadProgress;

0 commit comments

Comments
 (0)