Skip to content
This repository was archived by the owner on Oct 2, 2025. It is now read-only.

Commit 2d56928

Browse files
committed
add OCI resumability
1 parent 72b0302 commit 2d56928

File tree

5 files changed

+316
-151
lines changed

5 files changed

+316
-151
lines changed

src/archive.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ pub async fn extract_archive<P: AsRef<Path>>(path: P, output_dir: P) -> Result<(
4747
};
4848

4949
match format {
50-
ArchiveFormat::Zip => extract_zip(path, &output_dir)
50+
ArchiveFormat::Zip => extract_zip(path, output_dir)
5151
.await
52-
.map_err(|err| DownloadError::ZipError(err)),
53-
ArchiveFormat::Gz => extract_tar(path, &output_dir, flate2::read::GzDecoder::new).await,
54-
ArchiveFormat::Xz => extract_tar(path, &output_dir, xz2::read::XzDecoder::new).await,
55-
ArchiveFormat::Bz2 => extract_tar(path, &output_dir, bzip2::read::BzDecoder::new).await,
52+
.map_err(DownloadError::ZipError),
53+
ArchiveFormat::Gz => extract_tar(path, output_dir, flate2::read::GzDecoder::new).await,
54+
ArchiveFormat::Xz => extract_tar(path, output_dir, xz2::read::XzDecoder::new).await,
55+
ArchiveFormat::Bz2 => extract_tar(path, output_dir, bzip2::read::BzDecoder::new).await,
5656
ArchiveFormat::Zst => {
57-
extract_tar(path, &output_dir, |f| {
57+
extract_tar(path, output_dir, |f| {
5858
zstd::stream::read::Decoder::new(f).unwrap()
5959
})
6060
.await

src/downloader.rs

Lines changed: 56 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ use std::{
77
sync::{Arc, Mutex},
88
};
99

10-
use futures::{future::join_all, StreamExt};
10+
use futures::{future::join_all, StreamExt, TryStreamExt};
1111
use regex::Regex;
12-
use reqwest::header::{CONTENT_DISPOSITION, CONTENT_RANGE, ETAG, IF_RANGE, LAST_MODIFIED, RANGE};
13-
use serde::{Deserialize, Serialize};
12+
use reqwest::header::{HeaderMap, CONTENT_DISPOSITION, ETAG, LAST_MODIFIED};
1413

1514
use tokio::{
1615
fs::{self, OpenOptions},
@@ -25,6 +24,7 @@ use crate::{
2524
error::DownloadError,
2625
http_client::SHARED_CLIENT,
2726
oci::{OciClient, OciLayer, OciManifest, Reference},
27+
resume::ResumeSupport,
2828
utils::{
2929
build_absolute_path, default_prompt_confirm, extract_filename, extract_filename_from_url,
3030
is_elf, matches_pattern, FileMode,
@@ -41,12 +41,6 @@ pub enum DownloadState {
4141
Recovered,
4242
}
4343

44-
#[derive(Deserialize, Serialize)]
45-
pub struct Meta {
46-
etag: Option<String>,
47-
last_modified: Option<String>,
48-
}
49-
5044
pub struct DownloadOptions {
5145
pub url: String,
5246
pub output_path: Option<String>,
@@ -97,29 +91,28 @@ impl Downloader<'_> {
9791

9892
let hash_fallback = || {
9993
let mut hasher = blake3::Hasher::new();
100-
hasher.update(&options.url.as_bytes());
94+
hasher.update(options.url.as_bytes());
10195
let result = hasher.finalize();
10296
result.to_hex().to_string()
10397
};
10498

10599
let (mut provisional_path, final_dir) = if let Some(ref out) = options.output_path {
106100
if out.ends_with('/') {
107101
let dir = PathBuf::from(out);
108-
let base =
109-
extract_filename_from_url(&options.url).unwrap_or_else(|| hash_fallback());
102+
let base = extract_filename_from_url(&options.url).unwrap_or_else(hash_fallback);
110103
(dir.join(&base), Some(dir))
111104
} else {
112105
let p = PathBuf::from(out);
113106
if p.is_dir() {
114107
let base =
115-
extract_filename_from_url(&options.url).unwrap_or_else(|| hash_fallback());
108+
extract_filename_from_url(&options.url).unwrap_or_else(hash_fallback);
116109
(p.join(&base), Some(p))
117110
} else {
118111
(p, None)
119112
}
120113
}
121114
} else {
122-
let base = extract_filename_from_url(&options.url).unwrap_or_else(|| hash_fallback());
115+
let base = extract_filename_from_url(&options.url).unwrap_or_else(hash_fallback);
123116
(PathBuf::from(&base), None)
124117
};
125118

@@ -129,34 +122,25 @@ impl Downloader<'_> {
129122
}
130123
}
131124

132-
let part_path = provisional_path.with_extension("part");
133-
let meta_path = provisional_path.with_extension("part.meta");
125+
let (part_path, meta_path) = ResumeSupport::get_part_paths(&provisional_path);
126+
let (mut etag, mut last_modified) = ResumeSupport::read_metadata(&meta_path).await?;
134127

135-
let (mut etag, mut last_modified) = if fs::try_exists(&meta_path).await? {
136-
let data = fs::read_to_string(&meta_path).await?;
137-
let meta: Meta = serde_json::from_str(&data).unwrap();
138-
(meta.etag, meta.last_modified)
128+
let mut attempt = 0;
129+
let mut downloaded = if fs::try_exists(&part_path).await? {
130+
fs::metadata(&part_path).await?.len()
139131
} else {
140-
(None, None)
132+
0
141133
};
142134

143-
let mut attempt = 0;
144135
loop {
145-
let mut downloaded = if fs::try_exists(&part_path).await? {
146-
fs::metadata(&part_path).await?.len()
147-
} else {
148-
0
149-
};
136+
let mut headers = HeaderMap::new();
150137

151-
let mut request = self.client.get(url.clone());
152-
request = request.header(RANGE, format!("bytes={}-", downloaded));
153-
if let Some(ref etag) = etag {
154-
request = request.header(IF_RANGE, etag);
155-
}
156-
if let Some(ref last_modified) = last_modified {
157-
request = request.header(IF_RANGE, last_modified);
158-
}
159-
let response = request
138+
ResumeSupport::prepare_resume_headers(&mut headers, downloaded, &etag, &last_modified);
139+
140+
let response = self
141+
.client
142+
.get(url.clone())
143+
.headers(headers.clone())
160144
.send()
161145
.await
162146
.map_err(|err| DownloadError::NetworkError { source: err })?;
@@ -173,17 +157,20 @@ impl Downloader<'_> {
173157
.get(LAST_MODIFIED)
174158
.and_then(|h| h.to_str().ok())
175159
.map(String::from);
176-
let changed = (etag.is_some() && remote_etag.is_some() && etag != remote_etag)
177-
|| (last_modified.is_some()
178-
&& last_modified.is_some()
179-
&& last_modified != remote_modified);
180160

181-
// If Range not satisfiable or resource changed, clear and retry once
182-
if (status == reqwest::StatusCode::RANGE_NOT_SATISFIABLE || changed) && attempt == 0 {
161+
if ResumeSupport::should_restart_download(
162+
status,
163+
&etag,
164+
&last_modified,
165+
&remote_etag,
166+
&remote_modified,
167+
) && attempt == 0
168+
{
183169
fs::remove_file(&part_path).await.ok();
184170
fs::remove_file(&meta_path).await.ok();
185171
etag = remote_etag.clone();
186172
last_modified = remote_modified.clone();
173+
downloaded = 0;
187174
attempt += 1;
188175
continue;
189176
}
@@ -212,7 +199,7 @@ impl Downloader<'_> {
212199
.headers()
213200
.get(CONTENT_DISPOSITION)
214201
.and_then(|header| header.to_str().ok())
215-
.and_then(|header| extract_filename(header));
202+
.and_then(extract_filename);
216203

217204
if let Some(name) = header_name {
218205
provisional_path = if let Some(ref dir) = final_dir {
@@ -244,29 +231,16 @@ impl Downloader<'_> {
244231
}
245232
}
246233

247-
let total_size = response
248-
.headers()
249-
.get(CONTENT_RANGE)
250-
.and_then(|h| h.to_str().ok())
251-
.and_then(|range| range.rsplit_once('/').and_then(|(_, tot)| tot.parse().ok()))
252-
.or_else(|| response.content_length())
253-
.unwrap_or(0);
254-
255-
let should_truncate = response
256-
.headers()
257-
.get(CONTENT_RANGE)
258-
.and_then(|h| h.to_str().ok())
259-
.and_then(|r| r.split_whitespace().nth(1))
260-
.and_then(|range| range.split('/').next())
261-
.and_then(|se| se.split('-').next())
262-
.and_then(|s| s.parse::<u64>().ok())
263-
.map_or(false, |start| start != downloaded)
264-
|| status == reqwest::StatusCode::OK;
234+
let (should_truncate, total_size) =
235+
ResumeSupport::extract_range_info(&response, downloaded);
265236

266-
let mut stream = response.bytes_stream();
237+
if let Some(ref callback) = options.progress_callback {
238+
callback(DownloadState::Preparing(total_size));
239+
}
267240

268-
let mut file = if should_truncate {
241+
let mut file = if should_truncate || downloaded == 0 {
269242
fs::remove_file(&part_path).await.ok();
243+
downloaded = 0;
270244
OpenOptions::new()
271245
.create(true)
272246
.write(true)
@@ -281,28 +255,20 @@ impl Downloader<'_> {
281255
.await?
282256
};
283257

284-
let progress_callback = options.progress_callback;
285-
286-
if let Some(ref callback) = progress_callback {
287-
callback(DownloadState::Preparing(total_size));
288-
}
289-
290-
let meta = Meta {
291-
etag: remote_etag.clone(),
292-
last_modified: remote_modified.clone(),
293-
};
294-
fs::write(&meta_path, serde_json::to_string(&meta).unwrap()).await?;
258+
ResumeSupport::write_metadata(&meta_path, remote_etag, remote_modified).await?;
295259

296-
while let Some(chunk) = stream.next().await {
297-
let bytes = chunk.map_err(|err| DownloadError::NetworkError { source: err })?;
298-
file.write_all(&bytes).await?;
299-
downloaded = downloaded.saturating_add(bytes.len() as u64);
260+
let mut stream = response.bytes_stream();
261+
while let Some(chunk) = stream
262+
.try_next()
263+
.await
264+
.map_err(|_| DownloadError::ChunkError)?
265+
{
266+
file.write_all(&chunk).await?;
267+
downloaded += chunk.len() as u64;
300268

301-
if let Some(ref callback) = progress_callback {
269+
if let Some(ref callback) = options.progress_callback {
302270
callback(DownloadState::Progress(downloaded));
303271
}
304-
305-
downloaded += bytes.len() as u64;
306272
}
307273

308274
fs::rename(&part_path, &final_target).await?;
@@ -312,10 +278,6 @@ impl Downloader<'_> {
312278
fs::set_permissions(&final_target, Permissions::from_mode(0o755)).await?;
313279
}
314280

315-
if let Some(ref callback) = progress_callback {
316-
callback(DownloadState::Complete);
317-
}
318-
319281
if options.extract_archive {
320282
let extract_dir = match &options.extract_dir {
321283
Some(path) => {
@@ -334,6 +296,10 @@ impl Downloader<'_> {
334296
};
335297
archive::extract_archive(&final_target, &extract_dir).await?;
336298
}
299+
300+
if let Some(ref callback) = options.progress_callback {
301+
callback(DownloadState::Complete);
302+
}
337303
return Ok(final_target.to_string_lossy().into());
338304
}
339305
}
@@ -404,7 +370,11 @@ impl OciDownloader {
404370
let options = &self.options;
405371
let url = options.url.clone();
406372
let reference: Reference = url.into();
407-
let oci_client = OciClient::new(&reference, options.api.clone());
373+
let oci_client = OciClient::new(
374+
&reference,
375+
options.api.clone(),
376+
self.options.file_mode.clone(),
377+
);
408378

409379
if reference.tag.starts_with("sha256:") {
410380
return self.download_blob(oci_client).await;

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ pub mod gitlab;
66
pub mod http_client;
77
pub mod oci;
88
pub mod platform;
9+
pub mod resume;
910
pub mod utils;

0 commit comments

Comments
 (0)